Repository: atlas
Updated Branches:
  refs/heads/branch-1.0 d12cb7c4d -> de172af37


ATLAS-2891: updated hook notification processing with option to ignore 
potentially incorrect hive_column_lineage - #3

(cherry picked from commit 82e04037220a82ab985928221bc72ade80fc3be2)


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/de172af3
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/de172af3
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/de172af3

Branch: refs/heads/branch-1.0
Commit: de172af372d84a34076f05022ddac43e82182444
Parents: d12cb7c
Author: Madhan Neethiraj <mad...@apache.org>
Authored: Fri Oct 12 08:41:07 2018 -0700
Committer: Madhan Neethiraj <mad...@apache.org>
Committed: Fri Oct 12 14:34:56 2018 -0700

----------------------------------------------------------------------
 .../org/apache/atlas/hive/hook/HiveHook.java    |  2 +-
 .../hive/hook/events/CreateHiveProcess.java     | 16 +++++--------
 .../notification/NotificationHookConsumer.java  | 25 ++++++++------------
 3 files changed, 17 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/de172af3/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
----------------------------------------------------------------------
diff --git 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
index 4bb3b65..a9570e1 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
@@ -80,7 +80,7 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
         nameCacheTableMaxCount          = 
atlasProperties.getInt(HOOK_NAME_CACHE_TABLE_COUNT, 10000);
         nameCacheRebuildIntervalSeconds = 
atlasProperties.getInt(HOOK_NAME_CACHE_REBUID_INTERVAL_SEC, 60 * 60); // 60 
minutes default
         skipHiveColumnLineageHive20633                = 
atlasProperties.getBoolean(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, false);
-        skipHiveColumnLineageHive20633InputsThreshold = 
atlasProperties.getInt(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD,
 5); // skip greater-than 5 inputs by default
+        skipHiveColumnLineageHive20633InputsThreshold = 
atlasProperties.getInt(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD,
 15); // skip if avg # of inputs is > 15
 
         knownObjects = nameCacheEnabled ? new 
HiveHookObjectNamesCache(nameCacheDatabaseMaxCount, nameCacheTableMaxCount, 
nameCacheRebuildIntervalSeconds) : null;
     }

http://git-wip-us.apache.org/repos/asf/atlas/blob/de172af3/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
----------------------------------------------------------------------
diff --git 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
index 7dd5f1d..6d2517c 100644
--- 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
+++ 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
@@ -136,9 +136,8 @@ public class CreateHiveProcess extends BaseHiveEvent {
             return;
         }
 
-        final List<AtlasEntity> columnLineages    = new ArrayList<>();
-        boolean                 isSameInputsSize  = true;
-        int                     lineageInputsSize = -1;
+        final List<AtlasEntity> columnLineages     = new ArrayList<>();
+        int                     lineageInputsCount = 0;
 
         for (Map.Entry<DependencyKey, Dependency> entry : 
lineageInfo.entrySet()) {
             String      outputColName = getQualifiedName(entry.getKey());
@@ -173,11 +172,7 @@ public class CreateHiveProcess extends BaseHiveEvent {
                 continue;
             }
 
-            if (lineageInputsSize == -1) {
-                lineageInputsSize = inputColumns.size();
-            } else if (lineageInputsSize != inputColumns.size()) {
-                isSameInputsSize = false;
-            }
+            lineageInputsCount += inputColumns.size();
 
             AtlasEntity columnLineageProcess = new 
AtlasEntity(HIVE_TYPE_COLUMN_LINEAGE);
 
@@ -192,14 +187,15 @@ public class CreateHiveProcess extends BaseHiveEvent {
             columnLineages.add(columnLineageProcess);
         }
 
-        boolean skipColumnLineage = 
context.getSkipHiveColumnLineageHive20633() && columnLineages.size() > 1 && 
isSameInputsSize && lineageInputsSize > 
context.getSkipHiveColumnLineageHive20633InputsThreshold();
+        float   avgInputsCount    = columnLineages.size() > 0 ? (((float) 
lineageInputsCount) / columnLineages.size()) : 0;
+        boolean skipColumnLineage = 
context.getSkipHiveColumnLineageHive20633() && avgInputsCount > 
context.getSkipHiveColumnLineageHive20633InputsThreshold();
 
         if (!skipColumnLineage) {
             for (AtlasEntity columnLineage : columnLineages) {
                 entities.addEntity(columnLineage);
             }
         } else {
-            LOG.warn("skipping {} hive_column_lineage entities, each having {} 
inputs", columnLineages.size(), lineageInputsSize);
+            LOG.warn("skipped {} hive_column_lineage entities. Average # of 
inputs={}, threshold={}, total # of inputs={}", columnLineages.size(), 
avgInputsCount, context.getSkipHiveColumnLineageHive20633InputsThreshold(), 
lineageInputsCount);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/de172af3/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 76fbef0..a8f8dd6 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -144,7 +144,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
         maxWaitDuration       = 
applicationProperties.getInt(CONSUMER_MAX_RETRY_INTERVAL, minWaitDuration * 
60);  //  30 sec by default
 
         skipHiveColumnLineageHive20633                = 
applicationProperties.getBoolean(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, 
false);
-        skipHiveColumnLineageHive20633InputsThreshold = 
applicationProperties.getInt(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD,
 5); // skip greater-than 5 inputs by default
+        skipHiveColumnLineageHive20633InputsThreshold = 
applicationProperties.getInt(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD,
 15); // skip if avg # of inputs is > 15
 
         LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, 
skipHiveColumnLineageHive20633);
         LOG.info("{}={}", 
CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 
skipHiveColumnLineageHive20633InputsThreshold);
@@ -685,34 +685,29 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
         }
 
         if (entities != null && entities.getEntities() != null) {
-            boolean isSameInputsSize  = true;
-            int     lineageInputsSize = -1;
-            int     lineageCount      = 0;
+            int lineageCount       = 0;
+            int lineageInputsCount = 0;
 
             // find if all hive_column_lineage entities have same number of 
inputs, which is likely to be caused by HIVE-20633 that results in incorrect 
lineage in some cases
             for (ListIterator<AtlasEntity> iter = 
entities.getEntities().listIterator(); iter.hasNext(); ) {
                 AtlasEntity entity = iter.next();
 
                 if (StringUtils.equals(entity.getTypeName(), 
TYPE_HIVE_COLUMN_LINEAGE)) {
+                    lineageCount++;
+
                     Object objInputs = entity.getAttribute(ATTRIBUTE_INPUTS);
 
                     if (objInputs instanceof Collection) {
                         Collection inputs = (Collection) objInputs;
 
-                        lineageCount++;
-
-                        if (lineageInputsSize == -1) { // first entry
-                            lineageInputsSize = inputs.size();
-                        } else if (inputs.size() != lineageInputsSize) {
-                            isSameInputsSize = false;
-
-                            break;
-                        }
+                        lineageInputsCount += inputs.size();
                     }
                 }
             }
 
-            if (lineageCount > 1 && isSameInputsSize && lineageInputsSize > 
skipHiveColumnLineageHive20633InputsThreshold) {
+            float avgInputsCount = lineageCount > 0 ? (((float) 
lineageInputsCount) / lineageCount) : 0;
+
+            if (avgInputsCount > 
skipHiveColumnLineageHive20633InputsThreshold) {
                 int numRemovedEntities = 0;
 
                 for (ListIterator<AtlasEntity> iter = 
entities.getEntities().listIterator(); iter.hasNext(); ) {
@@ -726,7 +721,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
                 }
 
                 if (numRemovedEntities > 0) {
-                    LOG.warn("removed {} hive_column_lineage entities, each 
having {} inputs. offset={}, partition={}", numRemovedEntities, 
lineageInputsSize, kafkaMessage.getOffset(), kafkaMessage.getPartition());
+                    LOG.warn("removed {} hive_column_lineage entities. Average 
# of inputs={}, threshold={}, total # of inputs={}. topic-offset={}, 
partition={}", numRemovedEntities, avgInputsCount, 
skipHiveColumnLineageHive20633InputsThreshold, lineageInputsCount, 
kafkaMessage.getOffset(), kafkaMessage.getPartition());
                 }
             }
         }

Reply via email to