Repository: atlas Updated Branches: refs/heads/master 4128f5d2a -> d5f46e3f5
ATLAS-2891: updated hook notification processing with option to ignore potentially incorrect hive_column_lineage (cherry picked from commit 20215f3dd74b16fe4a7a9c8eb21b17925256f4f9) Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/d5f46e3f Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/d5f46e3f Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/d5f46e3f Branch: refs/heads/master Commit: d5f46e3f51753dfd85ddc76ffb6f227eb83285fc Parents: 4128f5d Author: Madhan Neethiraj <mad...@apache.org> Authored: Wed Sep 19 11:51:52 2018 -0700 Committer: Madhan Neethiraj <mad...@apache.org> Committed: Mon Oct 1 08:44:27 2018 -0700 ---------------------------------------------------------------------- .../atlas/hive/hook/AtlasHiveHookContext.java | 8 ++ .../org/apache/atlas/hive/hook/HiveHook.java | 15 +++ .../hive/hook/events/CreateHiveProcess.java | 26 ++++- .../apache/atlas/kafka/AtlasKafkaConsumer.java | 9 +- .../notification/NotificationHookConsumer.java | 102 ++++++++++++++++++- 5 files changed, 155 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/d5f46e3f/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java index b9e4256..b467f4c 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java @@ -85,6 +85,14 @@ public class AtlasHiveHookContext { return hook.getClusterName(); } + public boolean getSkipHiveColumnLineageHive20633() { + return hook.getSkipHiveColumnLineageHive20633(); + } + + public int getSkipHiveColumnLineageHive20633InputsThreshold() { + return hook.getSkipHiveColumnLineageHive20633InputsThreshold(); + } + public String getQualifiedName(Database db) { return (db.getName() + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName(); } http://git-wip-us.apache.org/repos/asf/atlas/blob/d5f46e3f/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java index 19075f6..4bb3b65 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java @@ -51,6 +51,8 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { public static final String HOOK_NAME_CACHE_DATABASE_COUNT = CONF_PREFIX + "name.cache.database.count"; public static final String HOOK_NAME_CACHE_TABLE_COUNT = CONF_PREFIX + "name.cache.table.count"; public static final String HOOK_NAME_CACHE_REBUID_INTERVAL_SEC = CONF_PREFIX + "name.cache.rebuild.interval.seconds"; + public static final String HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633 = CONF_PREFIX + "skip.hive_column_lineage.hive-20633"; + public static final String HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD = CONF_PREFIX + "skip.hive_column_lineage.hive-20633.inputs.threshold"; public static final String DEFAULT_CLUSTER_NAME = "primary"; @@ -62,6 +64,9 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { private static final int nameCacheTableMaxCount; private static final int nameCacheRebuildIntervalSeconds; + private static final boolean skipHiveColumnLineageHive20633; + private static final int skipHiveColumnLineageHive20633InputsThreshold; + private static HiveHookObjectNamesCache knownObjects = null; static { @@ -74,6 +79,8 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { nameCacheDatabaseMaxCount = atlasProperties.getInt(HOOK_NAME_CACHE_DATABASE_COUNT, 10000); nameCacheTableMaxCount = atlasProperties.getInt(HOOK_NAME_CACHE_TABLE_COUNT, 10000); nameCacheRebuildIntervalSeconds = atlasProperties.getInt(HOOK_NAME_CACHE_REBUID_INTERVAL_SEC, 60 * 60); // 60 minutes default + skipHiveColumnLineageHive20633 = atlasProperties.getBoolean(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, false); + skipHiveColumnLineageHive20633InputsThreshold = atlasProperties.getInt(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 5); // skip greater-than 5 inputs by default knownObjects = nameCacheEnabled ? new HiveHookObjectNamesCache(nameCacheDatabaseMaxCount, nameCacheTableMaxCount, nameCacheRebuildIntervalSeconds) : null; } @@ -182,6 +189,14 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { return clusterName; } + public boolean getSkipHiveColumnLineageHive20633() { + return skipHiveColumnLineageHive20633; + } + + public int getSkipHiveColumnLineageHive20633InputsThreshold() { + return skipHiveColumnLineageHive20633InputsThreshold; + } + public static class HiveHookObjectNamesCache { private final int dbMaxCacheCount; http://git-wip-us.apache.org/repos/asf/atlas/blob/d5f46e3f/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java index ae01d50..6d31b10 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java @@ -136,10 +136,18 @@ public class CreateHiveProcess extends BaseHiveEvent { return; } + final List<AtlasEntity> columnLineages = new ArrayList<>(); + boolean isSameInputsSize = true; + int lineageInputsSize = -1; + for (Map.Entry<DependencyKey, Dependency> entry : lineageInfo.entrySet()) { String outputColName = getQualifiedName(entry.getKey()); AtlasEntity outputColumn = context.getEntity(outputColName); + if (LOG.isDebugEnabled()) { + LOG.debug("processColumnLineage(): DependencyKey={}; Dependency={}", entry.getKey(), entry.getValue()); + } + if (outputColumn == null) { LOG.warn("column-lineage: non-existing output-column {}", outputColName); @@ -165,6 +173,12 @@ public class CreateHiveProcess extends BaseHiveEvent { continue; } + if (lineageInputsSize == -1) { + lineageInputsSize = inputColumns.size(); + } else if (lineageInputsSize != inputColumns.size()) { + isSameInputsSize = false; + } + AtlasEntity columnLineageProcess = new AtlasEntity(HIVE_TYPE_COLUMN_LINEAGE); columnLineageProcess.setAttribute(ATTRIBUTE_NAME, hiveProcess.getAttribute(ATTRIBUTE_NAME) + ":" + outputColumn.getAttribute(ATTRIBUTE_NAME)); @@ -175,7 +189,17 @@ public class CreateHiveProcess extends BaseHiveEvent { columnLineageProcess.setAttribute(ATTRIBUTE_DEPENDENCY_TYPE, entry.getValue().getType()); columnLineageProcess.setAttribute(ATTRIBUTE_EXPRESSION, entry.getValue().getExpr()); - entities.addEntity(columnLineageProcess); + columnLineages.add(columnLineageProcess); + } + + boolean skipColumnLineage = context.getSkipHiveColumnLineageHive20633() && isSameInputsSize && lineageInputsSize > context.getSkipHiveColumnLineageHive20633InputsThreshold(); + + if (!skipColumnLineage) { + for (AtlasEntity columnLineage : columnLineages) { + entities.addEntity(columnLineage); + } + } else { + LOG.warn("skipping {} hive_column_lineage entities, each having {} inputs", columnLineages.size(), lineageInputsSize); } } http://git-wip-us.apache.org/repos/asf/atlas/blob/d5f46e3f/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java index fd0c4e4..5c840c3 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java +++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java @@ -75,7 +75,14 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> { record.topic(), record.partition(), record.offset(), record.key(), record.value()); } - T message = deserializer.deserialize(record.value().toString()); + T message = null; + + try { + message = deserializer.deserialize(record.value().toString()); + } catch (OutOfMemoryError excp) { + LOG.error("Ignoring message that failed to deserialize: topic={}, partition={}, offset={}, key={}, value={}", + record.topic(), record.partition(), record.offset(), record.key(), record.value(), excp); + } if (message == null) { continue; http://git-wip-us.apache.org/repos/asf/atlas/blob/d5f46e3f/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 9f832b9..76fbef0 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -29,8 +29,9 @@ import org.apache.atlas.RequestContext; import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.kafka.AtlasKafkaMessage; import org.apache.atlas.listener.ActiveStateChangeHandler; -import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; +import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.model.notification.HookNotification.EntityCreateRequestV2; @@ -55,6 +56,7 @@ import org.apache.atlas.web.filters.AuditFilter; import org.apache.atlas.web.filters.AuditFilter.AuditLog; import org.apache.atlas.web.service.ServiceState; import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang.StringUtils; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,7 +65,11 @@ import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import javax.inject.Inject; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.ListIterator; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -82,6 +88,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private static final int SC_OK = 200; private static final int SC_BAD_REQUEST = 400; + private static final String TYPE_HIVE_COLUMN_LINEAGE = "hive_column_lineage"; + private static final String ATTRIBUTE_INPUTS = "inputs"; + private static final String THREADNAME_PREFIX = NotificationHookConsumer.class.getSimpleName(); public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads"; @@ -91,6 +100,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl public static final String CONSUMER_MIN_RETRY_INTERVAL = "atlas.notification.consumer.min.retry.interval"; public static final String CONSUMER_MAX_RETRY_INTERVAL = "atlas.notification.consumer.max.retry.interval"; + public static final String CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633 = "atlas.notification.consumer.skip.hive_column_lineage.hive-20633"; + public static final String CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD = "atlas.notification.consumer.skip.hive_column_lineage.hive-20633.inputs.threshold"; + public static final int SERVER_READY_WAIT_TIME_MS = 1000; private final AtlasEntityStore atlasEntityStore; @@ -101,6 +113,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private final int failedMsgCacheSize; private final int minWaitDuration; private final int maxWaitDuration; + private final boolean skipHiveColumnLineageHive20633; + private final int skipHiveColumnLineageHive20633InputsThreshold; private NotificationInterface notificationInterface; private ExecutorService executors; @@ -124,10 +138,16 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl this.applicationProperties = ApplicationProperties.get(); maxRetries = applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3); - failedMsgCacheSize = applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 20); + failedMsgCacheSize = applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 1); consumerRetryInterval = applicationProperties.getInt(CONSUMER_RETRY_INTERVAL, 500); minWaitDuration = applicationProperties.getInt(CONSUMER_MIN_RETRY_INTERVAL, consumerRetryInterval); // 500 ms by default maxWaitDuration = applicationProperties.getInt(CONSUMER_MAX_RETRY_INTERVAL, minWaitDuration * 60); // 30 sec by default + + skipHiveColumnLineageHive20633 = applicationProperties.getBoolean(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, false); + skipHiveColumnLineageHive20633InputsThreshold = applicationProperties.getInt(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 5); // skip greater-than 5 inputs by default + + LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, skipHiveColumnLineageHive20633); + LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, skipHiveColumnLineageHive20633InputsThreshold); } @Override @@ -367,6 +387,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl return; } + preProcessNotificationMessage(kafkaMsg); + // Used for intermediate conversions during create and update for (int numRetries = 0; numRetries < maxRetries; numRetries++) { if (LOG.isDebugEnabled()) { @@ -636,6 +658,80 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } } + private void preProcessNotificationMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) { + skipHiveColumnLineage(kafkaMsg); + } + + private void skipHiveColumnLineage(AtlasKafkaMessage<HookNotification> kafkaMessage) { + if (!skipHiveColumnLineageHive20633) { + return; + } + + final HookNotification message = kafkaMessage.getMessage(); + final AtlasEntitiesWithExtInfo entities; + + switch (message.getType()) { + case ENTITY_CREATE_V2: + entities = ((EntityCreateRequestV2) message).getEntities(); + break; + + case ENTITY_FULL_UPDATE_V2: + entities = ((EntityUpdateRequestV2) message).getEntities(); + break; + + default: + entities = null; + break; + } + + if (entities != null && entities.getEntities() != null) { + boolean isSameInputsSize = true; + int lineageInputsSize = -1; + int lineageCount = 0; + + // find if all hive_column_lineage entities have same number of inputs, which is likely to be caused by HIVE-20633 that results in incorrect lineage in some cases + for (ListIterator<AtlasEntity> iter = entities.getEntities().listIterator(); iter.hasNext(); ) { + AtlasEntity entity = iter.next(); + + if (StringUtils.equals(entity.getTypeName(), TYPE_HIVE_COLUMN_LINEAGE)) { + Object objInputs = entity.getAttribute(ATTRIBUTE_INPUTS); + + if (objInputs instanceof Collection) { + Collection inputs = (Collection) objInputs; + + lineageCount++; + + if (lineageInputsSize == -1) { // first entry + lineageInputsSize = inputs.size(); + } else if (inputs.size() != lineageInputsSize) { + isSameInputsSize = false; + + break; + } + } + } + } + + if (lineageCount > 1 && isSameInputsSize && lineageInputsSize > skipHiveColumnLineageHive20633InputsThreshold) { + int numRemovedEntities = 0; + + for (ListIterator<AtlasEntity> iter = entities.getEntities().listIterator(); iter.hasNext(); ) { + AtlasEntity entity = iter.next(); + + if (StringUtils.equals(entity.getTypeName(), TYPE_HIVE_COLUMN_LINEAGE)) { + iter.remove(); + + numRemovedEntities++; + } + } + + if (numRemovedEntities > 0) { + LOG.warn("removed {} hive_column_lineage entities, each having {} inputs. offset={}, partition={}", numRemovedEntities, lineageInputsSize, kafkaMessage.getOffset(), kafkaMessage.getPartition()); + } + } + } + } + static class FailedCommitOffsetRecorder { private Long currentOffset;