Repository: atlas
Updated Branches:
  refs/heads/branch-1.0 be96cd4aa -> 52719937a


ATLAS-2872: updated HiveHook to purge name cache periodically

(cherry picked from commit 110db1a40b84b9a5ad2590233ee3f03eeae2ec78)


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/52719937
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/52719937
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/52719937

Branch: refs/heads/branch-1.0
Commit: 52719937ad971b9ae25148a0e57a8ec3e94dc146
Parents: be96cd4
Author: Madhan Neethiraj <mad...@apache.org>
Authored: Thu Sep 13 18:21:31 2018 -0700
Committer: Madhan Neethiraj <mad...@apache.org>
Committed: Sat Sep 22 03:37:56 2018 -0700

----------------------------------------------------------------------
 .../atlas/hive/hook/AtlasHiveHookContext.java   |  51 +++++---
 .../org/apache/atlas/hive/hook/HiveHook.java    | 127 ++++++++++++-------
 2 files changed, 113 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/52719937/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
----------------------------------------------------------------------
diff --git 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
index a647192..b9e4256 100644
--- 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
+++ 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
@@ -19,10 +19,10 @@
 package org.apache.atlas.hive.hook;
 
 import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.hive.hook.HiveHook.HiveHookObjectNamesCache;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.ql.hooks.Entity;
 import org.apache.hadoop.hive.ql.hooks.HookContext;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.metadata.Hive;
@@ -46,12 +46,14 @@ public class AtlasHiveHookContext {
     private final HookContext              hiveContext;
     private final Hive                     hive;
     private final Map<String, AtlasEntity> qNameEntityMap = new HashMap<>();
+    private final HiveHookObjectNamesCache knownObjects;
 
-    public AtlasHiveHookContext(HiveHook hook, HiveOperation hiveOperation, 
HookContext hiveContext) throws Exception {
+    public AtlasHiveHookContext(HiveHook hook, HiveOperation hiveOperation, 
HookContext hiveContext, HiveHookObjectNamesCache knownObjects) throws 
Exception {
         this.hook          = hook;
         this.hiveOperation = hiveOperation;
         this.hiveContext   = hiveContext;
         this.hive          = Hive.get(hiveContext.getConf());
+        this.knownObjects  = knownObjects;
 
         init();
     }
@@ -102,40 +104,47 @@ public class AtlasHiveHookContext {
     }
 
     public boolean isKnownDatabase(String dbQualifiedName) {
-        return hook.isKnownDatabase(dbQualifiedName);
+        return knownObjects != null && dbQualifiedName != null ? 
knownObjects.isKnownDatabase(dbQualifiedName) : false;
     }
 
     public boolean isKnownTable(String tblQualifiedName) {
-        return hook.isKnownTable(tblQualifiedName);
+        return knownObjects != null && tblQualifiedName != null ? 
knownObjects.isKnownTable(tblQualifiedName) : false;
     }
 
     public void addToKnownEntities(Collection<AtlasEntity> entities) {
-        hook.addToKnownEntities(entities);
+        if (knownObjects != null && entities != null) {
+            knownObjects.addToKnownEntities(entities);
+        }
     }
 
     public void removeFromKnownDatabase(String dbQualifiedName) {
-        hook.removeFromKnownDatabase(dbQualifiedName);
+        if (knownObjects != null && dbQualifiedName != null) {
+            knownObjects.removeFromKnownDatabase(dbQualifiedName);
+        }
     }
 
     public void removeFromKnownTable(String tblQualifiedName) {
-        hook.removeFromKnownTable(tblQualifiedName);
+        if (knownObjects != null && tblQualifiedName != null) {
+            knownObjects.removeFromKnownTable(tblQualifiedName);
+        }
     }
 
     private void init() {
-        // for create and alter operations, remove output entities from 
'known' entity cache
-        String operationName = hiveContext.getOperationName();
-
-        if (operationName != null && operationName.startsWith("CREATE") || 
operationName.startsWith("ALTER")) {
-            if (CollectionUtils.isNotEmpty(hiveContext.getOutputs())) {
-                for (WriteEntity output : hiveContext.getOutputs()) {
-                    switch (output.getType()) {
-                        case DATABASE:
-                            
hook.removeFromKnownDatabase(getQualifiedName(output.getDatabase()));
-                            break;
-
-                        case TABLE:
-                            
hook.removeFromKnownTable(getQualifiedName(output.getTable()));
-                            break;
+        if (knownObjects != null) {
+            String operationName = hiveContext.getOperationName();
+
+            if (operationName != null && operationName.startsWith("CREATE") || 
operationName.startsWith("ALTER")) {
+                if (CollectionUtils.isNotEmpty(hiveContext.getOutputs())) {
+                    for (WriteEntity output : hiveContext.getOutputs()) {
+                        switch (output.getType()) {
+                            case DATABASE:
+                                
knownObjects.removeFromKnownDatabase(getQualifiedName(output.getDatabase()));
+                                break;
+
+                            case TABLE:
+                                
knownObjects.removeFromKnownTable(getQualifiedName(output.getTable()));
+                                break;
+                        }
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/atlas/blob/52719937/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
----------------------------------------------------------------------
diff --git 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
index b1ffd1d..19075f6 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
@@ -21,7 +21,6 @@ package org.apache.atlas.hive.hook;
 import org.apache.atlas.hive.hook.events.*;
 import org.apache.atlas.hook.AtlasHook;
 import org.apache.atlas.model.instance.AtlasEntity;
-import org.apache.atlas.utils.LruCache;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
 import org.apache.hadoop.hive.ql.hooks.HookContext;
@@ -34,7 +33,9 @@ import org.slf4j.LoggerFactory;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 
 import static 
org.apache.atlas.hive.hook.events.BaseHiveEvent.ATTRIBUTE_QUALIFIED_NAME;
 import static org.apache.atlas.hive.hook.events.BaseHiveEvent.HIVE_TYPE_DB;
@@ -44,31 +45,40 @@ import static 
org.apache.atlas.hive.hook.events.BaseHiveEvent.HIVE_TYPE_TABLE;
 public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
     private static final Logger LOG = LoggerFactory.getLogger(HiveHook.class);
 
-    public static final String CONF_PREFIX                    = 
"atlas.hook.hive.";
-    public static final String HOOK_DATABASE_NAME_CACHE_COUNT = CONF_PREFIX + 
"database.name.cache.count";
-    public static final String HOOK_TABLE_NAME_CACHE_COUNT    = CONF_PREFIX + 
"table.name.cache.count";
-    public static final String CONF_CLUSTER_NAME              = 
"atlas.cluster.name";
+    public static final String CONF_PREFIX                         = 
"atlas.hook.hive.";
+    public static final String CONF_CLUSTER_NAME                   = 
"atlas.cluster.name";
+    public static final String HOOK_NAME_CACHE_ENABLED             = 
CONF_PREFIX + "name.cache.enabled";
+    public static final String HOOK_NAME_CACHE_DATABASE_COUNT      = 
CONF_PREFIX + "name.cache.database.count";
+    public static final String HOOK_NAME_CACHE_TABLE_COUNT         = 
CONF_PREFIX + "name.cache.table.count";
+    public static final String HOOK_NAME_CACHE_REBUID_INTERVAL_SEC = 
CONF_PREFIX + "name.cache.rebuild.interval.seconds";
 
     public static final String DEFAULT_CLUSTER_NAME = "primary";
 
     private static final Map<String, HiveOperation> OPERATION_MAP = new 
HashMap<>();
-    private static final String                     clusterName;
-    private static final Map<String, Long>          knownDatabases;
-    private static final Map<String, Long>          knownTables;
+
+    private static final String  clusterName;
+    private static final boolean nameCacheEnabled;
+    private static final int     nameCacheDatabaseMaxCount;
+    private static final int     nameCacheTableMaxCount;
+    private static final int     nameCacheRebuildIntervalSeconds;
+
+    private static HiveHookObjectNamesCache knownObjects = null;
 
     static {
         for (HiveOperation hiveOperation : HiveOperation.values()) {
             OPERATION_MAP.put(hiveOperation.getOperationName(), hiveOperation);
         }
 
-        int dbNameCacheCount  = 
atlasProperties.getInt(HOOK_DATABASE_NAME_CACHE_COUNT, 10000);
-        int tblNameCacheCount = 
atlasProperties.getInt(HOOK_TABLE_NAME_CACHE_COUNT, 10000);
+        clusterName                     = 
atlasProperties.getString(CONF_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
+        nameCacheEnabled                = 
atlasProperties.getBoolean(HOOK_NAME_CACHE_ENABLED, true);
+        nameCacheDatabaseMaxCount       = 
atlasProperties.getInt(HOOK_NAME_CACHE_DATABASE_COUNT, 10000);
+        nameCacheTableMaxCount          = 
atlasProperties.getInt(HOOK_NAME_CACHE_TABLE_COUNT, 10000);
+        nameCacheRebuildIntervalSeconds = 
atlasProperties.getInt(HOOK_NAME_CACHE_REBUID_INTERVAL_SEC, 60 * 60); // 60 
minutes default
 
-        clusterName    = atlasProperties.getString(CONF_CLUSTER_NAME, 
DEFAULT_CLUSTER_NAME);
-        knownDatabases = dbNameCacheCount > 0 ? 
Collections.synchronizedMap(new LruCache<String, Long>(dbNameCacheCount, 0)) : 
null;
-        knownTables    = tblNameCacheCount > 0 ? 
Collections.synchronizedMap(new LruCache<String, Long>(tblNameCacheCount, 0)) : 
null;
+        knownObjects = nameCacheEnabled ? new 
HiveHookObjectNamesCache(nameCacheDatabaseMaxCount, nameCacheTableMaxCount, 
nameCacheRebuildIntervalSeconds) : null;
     }
 
+
     public HiveHook() {
     }
 
@@ -78,9 +88,15 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
             LOG.debug("==> HiveHook.run({})", hookContext.getOperationName());
         }
 
+        if (knownObjects != null && knownObjects.isCacheExpired()) {
+            LOG.info("HiveHook.run(): purging cached databaseNames ({}) and 
tableNames ({})", knownObjects.getCachedDbCount(), 
knownObjects.getCachedTableCount());
+
+            knownObjects = new 
HiveHookObjectNamesCache(nameCacheDatabaseMaxCount, nameCacheTableMaxCount, 
nameCacheRebuildIntervalSeconds);
+        }
+
         try {
             HiveOperation        oper    = 
OPERATION_MAP.get(hookContext.getOperationName());
-            AtlasHiveHookContext context = new AtlasHiveHookContext(this, 
oper, hookContext);
+            AtlasHiveHookContext context = new AtlasHiveHookContext(this, 
oper, hookContext, knownObjects);
 
             BaseHiveEvent event = null;
 
@@ -166,49 +182,72 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
         return clusterName;
     }
 
-    public boolean isKnownDatabase(String dbQualifiedName) {
-        return knownDatabases != null && dbQualifiedName != null ? 
knownDatabases.containsKey(dbQualifiedName) : false;
-    }
 
-    public boolean isKnownTable(String tblQualifiedName) {
-        return knownTables != null && tblQualifiedName != null ? 
knownTables.containsKey(tblQualifiedName) : false;
-    }
+    public static class HiveHookObjectNamesCache {
+        private final int         dbMaxCacheCount;
+        private final int         tblMaxCacheCount;
+        private final long        cacheExpiryTimeMs;
+        private final Set<String> knownDatabases;
+        private final Set<String> knownTables;
 
-    public void addToKnownEntities(Collection<AtlasEntity> entities) {
-        if (knownDatabases != null || knownTables != null) { // caching should 
be enabled at least for one
-            if (entities != null) {
-                for (AtlasEntity entity : entities) {
-                    if (StringUtils.equalsIgnoreCase(entity.getTypeName(), 
HIVE_TYPE_DB)) {
-                        addToKnownDatabase((String) 
entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
-                    } else if 
(StringUtils.equalsIgnoreCase(entity.getTypeName(), HIVE_TYPE_TABLE)) {
-                        addToKnwnTable((String) 
entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
-                    }
+        public HiveHookObjectNamesCache(int dbMaxCacheCount, int 
tblMaxCacheCount, long nameCacheRebuildIntervalSeconds) {
+            this.dbMaxCacheCount   = dbMaxCacheCount;
+            this.tblMaxCacheCount  = tblMaxCacheCount;
+            this.cacheExpiryTimeMs = nameCacheRebuildIntervalSeconds <= 0 ? 
Long.MAX_VALUE : (System.currentTimeMillis() + (nameCacheRebuildIntervalSeconds 
* 1000));
+            this.knownDatabases    = Collections.synchronizedSet(new 
HashSet<>());
+            this.knownTables       = Collections.synchronizedSet(new 
HashSet<>());
+        }
+
+        public int getCachedDbCount() {
+            return knownDatabases.size();
+        }
+
+        public int getCachedTableCount() {
+            return knownTables.size();
+        }
+
+        public boolean isCacheExpired() {
+            return System.currentTimeMillis() > cacheExpiryTimeMs;
+        }
+
+        public boolean isKnownDatabase(String dbQualifiedName) {
+            return knownDatabases.contains(dbQualifiedName);
+        }
+
+        public boolean isKnownTable(String tblQualifiedName) {
+            return knownTables.contains(tblQualifiedName);
+        }
+
+        public void addToKnownEntities(Collection<AtlasEntity> entities) {
+            for (AtlasEntity entity : entities) {
+                if (StringUtils.equalsIgnoreCase(entity.getTypeName(), 
HIVE_TYPE_DB)) {
+                    addToKnownDatabase((String) 
entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+                } else if (StringUtils.equalsIgnoreCase(entity.getTypeName(), 
HIVE_TYPE_TABLE)) {
+                    addToKnownTable((String) 
entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
                 }
             }
         }
-    }
 
-    public void addToKnownDatabase(String dbQualifiedName) {
-        if (knownDatabases != null && dbQualifiedName != null) {
-            knownDatabases.put(dbQualifiedName, System.currentTimeMillis());
+        public void addToKnownDatabase(String dbQualifiedName) {
+            if (knownDatabases.size() < dbMaxCacheCount) {
+                knownDatabases.add(dbQualifiedName);
+            }
         }
-    }
 
-    public void addToKnwnTable(String tblQualifiedName) {
-        if (knownTables != null && tblQualifiedName != null) {
-            knownTables.put(tblQualifiedName, System.currentTimeMillis());
+        public void addToKnownTable(String tblQualifiedName) {
+            if (knownTables.size() < tblMaxCacheCount) {
+                knownTables.add(tblQualifiedName);
+            }
         }
-    }
 
-    public void removeFromKnownDatabase(String dbQualifiedName) {
-        if (knownDatabases != null && dbQualifiedName != null) {
+        public void removeFromKnownDatabase(String dbQualifiedName) {
             knownDatabases.remove(dbQualifiedName);
         }
-    }
 
-    public void removeFromKnownTable(String tblQualifiedName) {
-        if (knownTables != null && tblQualifiedName != null) {
-            knownTables.remove(tblQualifiedName);
+        public void removeFromKnownTable(String tblQualifiedName) {
+            if (tblQualifiedName != null) {
+                knownTables.remove(tblQualifiedName);
+            }
         }
     }
 }

Reply via email to