Repository: atlas Updated Branches: refs/heads/branch-1.0 d12cb7c4d -> de172af37
ATLAS-2891: updated hook notification processing with option to ignore potentially incorrect hive_column_lineage - #3 (cherry picked from commit 82e04037220a82ab985928221bc72ade80fc3be2) Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/de172af3 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/de172af3 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/de172af3 Branch: refs/heads/branch-1.0 Commit: de172af372d84a34076f05022ddac43e82182444 Parents: d12cb7c Author: Madhan Neethiraj <mad...@apache.org> Authored: Fri Oct 12 08:41:07 2018 -0700 Committer: Madhan Neethiraj <mad...@apache.org> Committed: Fri Oct 12 14:34:56 2018 -0700 ---------------------------------------------------------------------- .../org/apache/atlas/hive/hook/HiveHook.java | 2 +- .../hive/hook/events/CreateHiveProcess.java | 16 +++++-------- .../notification/NotificationHookConsumer.java | 25 ++++++++------------ 3 files changed, 17 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/de172af3/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java index 4bb3b65..a9570e1 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java @@ -80,7 +80,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { nameCacheTableMaxCount = atlasProperties.getInt(HOOK_NAME_CACHE_TABLE_COUNT, 10000); nameCacheRebuildIntervalSeconds = atlasProperties.getInt(HOOK_NAME_CACHE_REBUID_INTERVAL_SEC, 60 * 60); // 60 minutes default skipHiveColumnLineageHive20633 = atlasProperties.getBoolean(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, false); - skipHiveColumnLineageHive20633InputsThreshold = atlasProperties.getInt(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 5); // skip greater-than 5 inputs by default + skipHiveColumnLineageHive20633InputsThreshold = atlasProperties.getInt(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 15); // skip if avg # of inputs is > 15 knownObjects = nameCacheEnabled ? new HiveHookObjectNamesCache(nameCacheDatabaseMaxCount, nameCacheTableMaxCount, nameCacheRebuildIntervalSeconds) : null; } http://git-wip-us.apache.org/repos/asf/atlas/blob/de172af3/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java index 7dd5f1d..6d2517c 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java @@ -136,9 +136,8 @@ public class CreateHiveProcess extends BaseHiveEvent { return; } - final List<AtlasEntity> columnLineages = new ArrayList<>(); - boolean isSameInputsSize = true; - int lineageInputsSize = -1; + final List<AtlasEntity> columnLineages = new ArrayList<>(); + int lineageInputsCount = 0; for (Map.Entry<DependencyKey, Dependency> entry : lineageInfo.entrySet()) { String outputColName = getQualifiedName(entry.getKey()); @@ -173,11 +172,7 @@ public class CreateHiveProcess extends BaseHiveEvent { continue; } - if (lineageInputsSize == -1) { - lineageInputsSize = inputColumns.size(); - } else if (lineageInputsSize != inputColumns.size()) { - isSameInputsSize = false; - } + lineageInputsCount += inputColumns.size(); AtlasEntity columnLineageProcess = new AtlasEntity(HIVE_TYPE_COLUMN_LINEAGE); @@ -192,14 +187,15 @@ public class CreateHiveProcess extends BaseHiveEvent { columnLineages.add(columnLineageProcess); } - boolean skipColumnLineage = context.getSkipHiveColumnLineageHive20633() && columnLineages.size() > 1 && isSameInputsSize && lineageInputsSize > context.getSkipHiveColumnLineageHive20633InputsThreshold(); + float avgInputsCount = columnLineages.size() > 0 ? (((float) lineageInputsCount) / columnLineages.size()) : 0; + boolean skipColumnLineage = context.getSkipHiveColumnLineageHive20633() && avgInputsCount > context.getSkipHiveColumnLineageHive20633InputsThreshold(); if (!skipColumnLineage) { for (AtlasEntity columnLineage : columnLineages) { entities.addEntity(columnLineage); } } else { - LOG.warn("skipping {} hive_column_lineage entities, each having {} inputs", columnLineages.size(), lineageInputsSize); + LOG.warn("skipped {} hive_column_lineage entities. Average # of inputs={}, threshold={}, total # of inputs={}", columnLineages.size(), avgInputsCount, context.getSkipHiveColumnLineageHive20633InputsThreshold(), lineageInputsCount); } } http://git-wip-us.apache.org/repos/asf/atlas/blob/de172af3/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 76fbef0..a8f8dd6 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -144,7 +144,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl maxWaitDuration = applicationProperties.getInt(CONSUMER_MAX_RETRY_INTERVAL, minWaitDuration * 60); // 30 sec by default skipHiveColumnLineageHive20633 = applicationProperties.getBoolean(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, false); - skipHiveColumnLineageHive20633InputsThreshold = applicationProperties.getInt(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 5); // skip greater-than 5 inputs by default + skipHiveColumnLineageHive20633InputsThreshold = applicationProperties.getInt(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 15); // skip if avg # of inputs is > 15 LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, skipHiveColumnLineageHive20633); LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, skipHiveColumnLineageHive20633InputsThreshold); @@ -685,34 +685,29 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } if (entities != null && entities.getEntities() != null) { - boolean isSameInputsSize = true; - int lineageInputsSize = -1; - int lineageCount = 0; + int lineageCount = 0; + int lineageInputsCount = 0; // find if all hive_column_lineage entities have same number of inputs, which is likely to be caused by HIVE-20633 that results in incorrect lineage in some cases for (ListIterator<AtlasEntity> iter = entities.getEntities().listIterator(); iter.hasNext(); ) { AtlasEntity entity = iter.next(); if (StringUtils.equals(entity.getTypeName(), TYPE_HIVE_COLUMN_LINEAGE)) { + lineageCount++; + Object objInputs = entity.getAttribute(ATTRIBUTE_INPUTS); if (objInputs instanceof Collection) { Collection inputs = (Collection) objInputs; - lineageCount++; - - if (lineageInputsSize == -1) { // first entry - lineageInputsSize = inputs.size(); - } else if (inputs.size() != lineageInputsSize) { - isSameInputsSize = false; - - break; - } + lineageInputsCount += inputs.size(); } } } - if (lineageCount > 1 && isSameInputsSize && lineageInputsSize > skipHiveColumnLineageHive20633InputsThreshold) { + float avgInputsCount = lineageCount > 0 ? (((float) lineageInputsCount) / lineageCount) : 0; + + if (avgInputsCount > skipHiveColumnLineageHive20633InputsThreshold) { int numRemovedEntities = 0; for (ListIterator<AtlasEntity> iter = entities.getEntities().listIterator(); iter.hasNext(); ) { @@ -726,7 +721,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } if (numRemovedEntities > 0) { - LOG.warn("removed {} hive_column_lineage entities, each having {} inputs. offset={}, partition={}", numRemovedEntities, lineageInputsSize, kafkaMessage.getOffset(), kafkaMessage.getPartition()); + LOG.warn("removed {} hive_column_lineage entities. Average # of inputs={}, threshold={}, total # of inputs={}. topic-offset={}, partition={}", numRemovedEntities, avgInputsCount, skipHiveColumnLineageHive20633InputsThreshold, lineageInputsCount, kafkaMessage.getOffset(), kafkaMessage.getPartition()); } } }