This is an automated email from the ASF dual-hosted git repository. madhan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push: new fc2a926 ATLAS-3054: improved batch processing in notificaiton handler to avoid processing of an entity multiple times - #2 fc2a926 is described below commit fc2a926cc7d0f0070c25f9afc68bf6a5a1bb6df2 Author: Madhan Neethiraj <mad...@apache.org> AuthorDate: Mon Mar 11 16:17:03 2019 -0700 ATLAS-3054: improved batch processing in notificaiton handler to avoid processing of an entity multiple times - #2 --- addons/models/1000-Hadoop/1030-hive_model.json | 2 +- .../org/apache/atlas/type/AtlasBuiltInTypes.java | 13 +- .../org/apache/atlas/type/AtlasEntityType.java | 4 +- .../apache/atlas/type/AtlasRelationshipType.java | 7 +- .../org/apache/atlas/type/AtlasStructType.java | 37 ++++- .../org/apache/atlas/utils/AtlasEntityUtil.java | 33 ++++- .../atlas/discovery/EntityDiscoveryService.java | 19 +-- .../converters/AtlasStructFormatConverter.java | 9 +- .../store/graph/v2/AtlasEntityStoreV2.java | 2 +- .../store/graph/v2/EntityGraphMapper.java | 79 ++++++++--- .../notification/NotificationHookConsumer.java | 153 ++++++++++++++++++++- .../preprocessor/HivePreprocessor.java | 4 +- .../preprocessor/PreprocessorContext.java | 4 + 13 files changed, 305 insertions(+), 61 deletions(-) diff --git a/addons/models/1000-Hadoop/1030-hive_model.json b/addons/models/1000-Hadoop/1030-hive_model.json index 324d716..7207a41 100644 --- a/addons/models/1000-Hadoop/1030-hive_model.json +++ b/addons/models/1000-Hadoop/1030-hive_model.json @@ -508,7 +508,7 @@ "serviceType": "hive", "typeVersion": "1.2", "relationshipCategory": "COMPOSITION", - "relationshipLabel": "__hive_table.partitionkeys", + "relationshipLabel": "__hive_table.partitionKeys", "endDef1": { "type": "hive_table", "name": "partitionKeys", diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasBuiltInTypes.java b/intg/src/main/java/org/apache/atlas/type/AtlasBuiltInTypes.java index 6bedf6d..ce14b5b 100644 --- a/intg/src/main/java/org/apache/atlas/type/AtlasBuiltInTypes.java +++ b/intg/src/main/java/org/apache/atlas/type/AtlasBuiltInTypes.java @@ -20,6 +20,7 @@ package org.apache.atlas.type; import org.apache.atlas.model.TypeCategory; import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.model.instance.AtlasRelatedObjectId; import org.apache.atlas.model.typedef.AtlasBaseTypeDef; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang.StringUtils; @@ -672,19 +673,25 @@ public class AtlasBuiltInTypes { @Override public AtlasObjectId getNormalizedValue(Object obj) { + AtlasObjectId ret = null; + if (obj != null) { if (obj instanceof AtlasObjectId) { - return (AtlasObjectId) obj; + ret = (AtlasObjectId) obj; } else if (obj instanceof Map) { Map map = (Map) obj; if (isValidMap(map)) { - return new AtlasObjectId(map); + if (map.containsKey(AtlasRelatedObjectId.KEY_RELATIONSHIP_TYPE)) { + ret = new AtlasRelatedObjectId(map); + } else { + ret = new AtlasObjectId(map); + } } } } - return null; + return ret; } private boolean isValidMap(Map map) { diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java b/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java index 2557bb3..b5360c1 100644 --- a/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java +++ b/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java @@ -799,7 +799,7 @@ public class AtlasEntityType extends AtlasStructType { AtlasEntity entityObj = (AtlasEntity) obj; for (String attributeName : relationshipAttributes.keySet()) { - Object value = entityObj.getAttribute(attributeName); + Object value = entityObj.getRelationshipAttribute(attributeName); String relationshipType = AtlasEntityUtil.getRelationshipType(value); AtlasAttribute attribute = getRelationshipAttribute(attributeName, relationshipType); @@ -824,7 +824,7 @@ public class AtlasEntityType extends AtlasStructType { } } } else if (obj instanceof Map) { - Map attributes = AtlasTypeUtil.toStructAttributes((Map) obj); + Map attributes = AtlasTypeUtil.toRelationshipAttributes((Map) obj); for (String attributeName : relationshipAttributes.keySet()) { Object value = attributes.get(attributeName); diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasRelationshipType.java b/intg/src/main/java/org/apache/atlas/type/AtlasRelationshipType.java index 3ea8d80..183772b 100644 --- a/intg/src/main/java/org/apache/atlas/type/AtlasRelationshipType.java +++ b/intg/src/main/java/org/apache/atlas/type/AtlasRelationshipType.java @@ -159,7 +159,9 @@ public class AtlasRelationshipType extends AtlasStructType { AtlasRelationshipEdgeDirection end2Direction = IN; if (endDef1.getIsLegacyAttribute() && endDef2.getIsLegacyAttribute()) { - end2Direction = OUT; + if (relationshipDef.getRelationshipLabel() == null) { // only if label hasn't been overridden + end2Direction = OUT; + } } else if (!endDef1.getIsLegacyAttribute() && endDef2.getIsLegacyAttribute()) { end1Direction = IN; end2Direction = OUT; @@ -345,11 +347,12 @@ public class AtlasRelationshipType extends AtlasStructType { } attribute = new AtlasAttribute(entityType, attributeDef, - typeRegistry.getType(attrTypeName), relationshipLabel); + typeRegistry.getType(attrTypeName), getTypeName(), relationshipLabel); } else { // attribute already exists (legacy attribute which is also a relationship attribute) // add relationshipLabel information to existing attribute + attribute.setRelationshipName(getTypeName()); attribute.setRelationshipEdgeLabel(relationshipLabel); } diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasStructType.java b/intg/src/main/java/org/apache/atlas/type/AtlasStructType.java index 84c76d7..0be7e18 100644 --- a/intg/src/main/java/org/apache/atlas/type/AtlasStructType.java +++ b/intg/src/main/java/org/apache/atlas/type/AtlasStructType.java @@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory; import java.util.*; +import static org.apache.atlas.model.TypeCategory.*; import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef.CONSTRAINT_PARAM_ATTRIBUTE; import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef.CONSTRAINT_TYPE_INVERSE_REF; import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef.CONSTRAINT_TYPE_OWNED_REF; @@ -701,19 +702,23 @@ public class AtlasStructType extends AtlasType { private final String vertexPropertyName; private final String vertexUniquePropertyName; private final boolean isOwnedRef; + private final boolean isObjectRef; private final String inverseRefAttributeName; private AtlasAttribute inverseRefAttribute; + private String relationshipName; private String relationshipEdgeLabel; private AtlasRelationshipEdgeDirection relationshipEdgeDirection; - public AtlasAttribute(AtlasStructType definedInType, AtlasAttributeDef attrDef, AtlasType attributeType, String relationshipLabel) { + public AtlasAttribute(AtlasStructType definedInType, AtlasAttributeDef attrDef, AtlasType attributeType, String relationshipName, String relationshipLabel) { this.definedInType = definedInType; this.attributeDef = attrDef; this.attributeType = attributeType.getTypeForAttribute(); this.qualifiedName = getQualifiedAttributeName(definedInType.getStructDef(), attributeDef.getName()); this.vertexPropertyName = encodePropertyKey(this.qualifiedName); this.vertexUniquePropertyName = attrDef.getIsUnique() ? encodePropertyKey(getQualifiedAttributeName(definedInType.getStructDef(), UNIQUE_ATTRIBUTE_SHADE_PROPERTY_PREFIX + attributeDef.getName())) : null; + this.relationshipName = relationshipName; this.relationshipEdgeLabel = getRelationshipEdgeLabel(relationshipLabel); + boolean isOwnedRef = false; String inverseRefAttribute = null; @@ -736,10 +741,32 @@ public class AtlasStructType extends AtlasType { this.isOwnedRef = isOwnedRef; this.inverseRefAttributeName = inverseRefAttribute; this.relationshipEdgeDirection = AtlasRelationshipEdgeDirection.OUT; + + switch (attributeType.getTypeCategory()) { + case OBJECT_ID_TYPE: + isObjectRef = true; + break; + + case MAP: + AtlasMapType mapType = (AtlasMapType) attributeType; + + isObjectRef = mapType.getValueType().getTypeCategory() == OBJECT_ID_TYPE; + break; + + case ARRAY: + AtlasArrayType arrayType = (AtlasArrayType) attributeType; + + isObjectRef = arrayType.getElementType().getTypeCategory() == OBJECT_ID_TYPE; + break; + + default: + isObjectRef = false; + break; + } } public AtlasAttribute(AtlasStructType definedInType, AtlasAttributeDef attrDef, AtlasType attributeType) { - this(definedInType, attrDef, attributeType, null); + this(definedInType, attrDef, attributeType, null, null); } public AtlasStructType getDefinedInType() { return definedInType; } @@ -766,12 +793,18 @@ public class AtlasStructType extends AtlasType { public boolean isOwnedRef() { return isOwnedRef; } + public boolean isObjectRef() { return isObjectRef; } + public String getInverseRefAttributeName() { return inverseRefAttributeName; } public AtlasAttribute getInverseRefAttribute() { return inverseRefAttribute; } public void setInverseRefAttribute(AtlasAttribute inverseAttr) { inverseRefAttribute = inverseAttr; } + public String getRelationshipName() { return relationshipName; } + + public void setRelationshipName(String relationshipName) { this.relationshipName = relationshipName; } + public String getRelationshipEdgeLabel() { return relationshipEdgeLabel; } public void setRelationshipEdgeLabel(String relationshipEdgeLabel) { this.relationshipEdgeLabel = relationshipEdgeLabel; } diff --git a/intg/src/main/java/org/apache/atlas/utils/AtlasEntityUtil.java b/intg/src/main/java/org/apache/atlas/utils/AtlasEntityUtil.java index 3002217..1e78e25 100644 --- a/intg/src/main/java/org/apache/atlas/utils/AtlasEntityUtil.java +++ b/intg/src/main/java/org/apache/atlas/utils/AtlasEntityUtil.java @@ -27,6 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -126,10 +127,38 @@ public class AtlasEntityUtil { if (val instanceof AtlasRelatedObjectId) { ret = ((AtlasRelatedObjectId) val).getRelationshipType(); + } else if (val instanceof Collection) { + String elemRelationshipType = null; + + for (Object elem : (Collection) val) { + elemRelationshipType = getRelationshipType(elem); + + if (elemRelationshipType != null) { + break; + } + } + + ret = elemRelationshipType; } else if (val instanceof Map) { - Object relTypeName = ((Map) val).get(AtlasRelatedObjectId.KEY_RELATIONSHIP_TYPE); + Map mapValue = (Map) val; + + if (mapValue.containsKey(AtlasRelatedObjectId.KEY_RELATIONSHIP_TYPE)) { + Object relTypeName = ((Map) val).get(AtlasRelatedObjectId.KEY_RELATIONSHIP_TYPE); - ret = relTypeName != null ? relTypeName.toString() : null; + ret = relTypeName != null ? relTypeName.toString() : null; + } else { + String entryRelationshipType = null; + + for (Object entryVal : mapValue.values()) { + entryRelationshipType = getRelationshipType(entryVal); + + if (entryRelationshipType != null) { + break; + } + } + + ret = entryRelationshipType; + } } else { ret = null; } diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java index 9df360c..19f81d3 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java @@ -537,7 +537,7 @@ public class EntityDiscoveryService implements AtlasDiscoveryService { AtlasAttribute attribute = entityType.getAttribute(relation); if (attribute != null) { - if (isRelationshipAttribute(attribute)) { + if (attribute.isObjectRef()) { relation = attribute.getRelationshipEdgeLabel(); } else { throw new AtlasBaseException(AtlasErrorCode.INVALID_RELATIONSHIP_ATTRIBUTE, relation, attribute.getTypeName()); @@ -790,23 +790,6 @@ public class EntityDiscoveryService implements AtlasDiscoveryService { return ""; } - private boolean isRelationshipAttribute(AtlasAttribute attribute) throws AtlasBaseException { - boolean ret = true; - AtlasType attrType = attribute.getAttributeType(); - - if (attrType.getTypeCategory() == ARRAY) { - attrType = ((AtlasArrayType) attrType).getElementType(); - } else if (attrType.getTypeCategory() == MAP) { - attrType = ((AtlasMapType) attrType).getValueType(); - } - - if (attrType.getTypeCategory() != OBJECT_ID_TYPE) { - ret = false; - } - - return ret; - } - private Set<String> getEntityStates() { return new HashSet<>(Arrays.asList(ACTIVE.toString(), DELETED.toString())); } diff --git a/repository/src/main/java/org/apache/atlas/repository/converters/AtlasStructFormatConverter.java b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasStructFormatConverter.java index 51a6426..173fcee 100644 --- a/repository/src/main/java/org/apache/atlas/repository/converters/AtlasStructFormatConverter.java +++ b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasStructFormatConverter.java @@ -23,6 +23,7 @@ import org.apache.atlas.model.TypeCategory; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.AtlasStruct; +import org.apache.atlas.utils.AtlasEntityUtil; import org.apache.atlas.v1.model.instance.Struct; import org.apache.atlas.type.*; import org.apache.atlas.type.AtlasBuiltInTypes.AtlasObjectIdType; @@ -134,11 +135,12 @@ public class AtlasStructFormatConverter extends AtlasAbstractFormatConverter { // Only process the requested/set attributes for (String attrName : attributes.keySet()) { - AtlasAttribute attr = structType.getAttribute(attrName); + Object v2Value = attributes.get(attrName); + AtlasAttribute attr = structType.getAttribute(attrName); if (attr == null) { if (isEntityType) { - attr = ((AtlasEntityType) structType).getRelationshipAttribute(attrName, null); + attr = ((AtlasEntityType) structType).getRelationshipAttribute(attrName, AtlasEntityUtil.getRelationshipType(v2Value)); } if (attr == null) { @@ -149,7 +151,6 @@ public class AtlasStructFormatConverter extends AtlasAbstractFormatConverter { AtlasType attrType = attr.getAttributeType(); AtlasFormatConverter attrConverter = converterRegistry.getConverter(attrType.getTypeCategory()); - Object v2Value = attributes.get(attr.getName()); if (v2Value != null && isEntityType && attr.isOwnedRef()) { if (LOG.isDebugEnabled()) { @@ -256,6 +257,7 @@ public class AtlasStructFormatConverter extends AtlasAbstractFormatConverter { // Only process the requested/set attributes for (Object attribKey : attributes.keySet()) { String attrName = attribKey.toString(); + Object v1Value = attributes.get(attrName); AtlasAttribute attr = structType.getAttribute(attrName); if (attr == null) { @@ -271,7 +273,6 @@ public class AtlasStructFormatConverter extends AtlasAbstractFormatConverter { AtlasType attrType = attr.getAttributeType(); AtlasFormatConverter attrConverter = converterRegistry.getConverter(attrType.getTypeCategory()); - Object v1Value = attributes.get(attrName); if (attrConverter.isValidValueV1(v1Value, attrType)) { Object v2Value = attrConverter.fromV1ToV2(v1Value, attrType, context); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java index a62f335..a5a6291 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java @@ -323,7 +323,7 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore { AtlasAttribute attr = entityType.getAttribute(attrName); if (attr == null) { - attr = entityType.getRelationshipAttribute(attrName, null); + attr = entityType.getRelationshipAttribute(attrName, AtlasEntityUtil.getRelationshipType(attrValue)); if (attr == null) { throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_ATTRIBUTE, attrName, entity.getTypeName()); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java index a6f1250..31b20ff 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java @@ -79,8 +79,43 @@ import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.DE import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.PARTIAL_UPDATE; import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE; import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality.SET; -import static org.apache.atlas.repository.Constants.*; -import static org.apache.atlas.repository.graph.GraphHelper.*; +import static org.apache.atlas.repository.Constants.ATTRIBUTE_KEY_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_GUID; +import static org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_STATUS; +import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL; +import static org.apache.atlas.repository.Constants.CLASSIFICATION_VALIDITY_PERIODS_KEY; +import static org.apache.atlas.repository.Constants.CLASSIFICATION_VERTEX_PROPAGATE_KEY; +import static org.apache.atlas.repository.Constants.CLASSIFICATION_VERTEX_REMOVE_PROPAGATIONS_KEY; +import static org.apache.atlas.repository.Constants.CREATED_BY_KEY; +import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.GUID_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.HOME_ID_KEY; +import static org.apache.atlas.repository.Constants.IS_PROXY_KEY; +import static org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.MODIFIED_BY_KEY; +import static org.apache.atlas.repository.Constants.PROVENANCE_TYPE_KEY; +import static org.apache.atlas.repository.Constants.RELATIONSHIP_GUID_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.SUPER_TYPES_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.TIMESTAMP_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.TRAIT_NAMES_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.ATTRIBUTE_INDEX_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.VERSION_PROPERTY_KEY; +import static org.apache.atlas.repository.graph.GraphHelper.getCollectionElementsUsingRelationship; +import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEdge; +import static org.apache.atlas.repository.graph.GraphHelper.getClassificationVertex; +import static org.apache.atlas.repository.graph.GraphHelper.getDefaultRemovePropagations; +import static org.apache.atlas.repository.graph.GraphHelper.getMapElementsProperty; +import static org.apache.atlas.repository.graph.GraphHelper.getStatus; +import static org.apache.atlas.repository.graph.GraphHelper.getTraitLabel; +import static org.apache.atlas.repository.graph.GraphHelper.getTraitNames; +import static org.apache.atlas.repository.graph.GraphHelper.getTypeName; +import static org.apache.atlas.repository.graph.GraphHelper.getTypeNames; +import static org.apache.atlas.repository.graph.GraphHelper.isActive; +import static org.apache.atlas.repository.graph.GraphHelper.isPropagationEnabled; +import static org.apache.atlas.repository.graph.GraphHelper.isRelationshipEdge; +import static org.apache.atlas.repository.graph.GraphHelper.string; +import static org.apache.atlas.repository.graph.GraphHelper.updateModificationMetadata; import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getIdFromVertex; import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.isReference; import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN; @@ -827,27 +862,29 @@ public class EntityGraphMapper { throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, (ctx.getValue() == null ? null : ctx.getValue().toString())); } - String attributeName = ctx.getAttribute().getName(); - AtlasType type = typeRegistry.getType(AtlasGraphUtilsV2.getTypeName(entityVertex)); - - AtlasRelationshipEdgeDirection edgeDirection = ctx.getAttribute().getRelationshipEdgeDirection(); + AtlasType type = typeRegistry.getType(AtlasGraphUtilsV2.getTypeName(entityVertex)); if (type instanceof AtlasEntityType) { AtlasEntityType entityType = (AtlasEntityType) type; + AtlasAttribute attribute = ctx.getAttribute(); + String attributeName = attribute.getName(); // use relationship to create/update edges if (entityType.hasRelationshipAttribute(attributeName)) { Map<String, Object> relationshipAttributes = getRelationshipAttributes(ctx.getValue()); if (ctx.getCurrentEdge() != null) { - ret = updateRelationship(ctx.getCurrentEdge(), entityVertex, attributeVertex, edgeDirection, relationshipAttributes); - + ret = updateRelationship(ctx.getCurrentEdge(), entityVertex, attributeVertex, attribute.getRelationshipEdgeDirection(), relationshipAttributes); } else { - String relationshipName = graphHelper.getRelationshipTypeName(entityVertex, entityType, attributeName); + String relationshipName = attribute.getRelationshipName(); AtlasVertex fromVertex; AtlasVertex toVertex; - if (edgeDirection == IN) { + if (StringUtils.isEmpty(relationshipName)) { + relationshipName = graphHelper.getRelationshipTypeName(entityVertex, entityType, attributeName); + } + + if (attribute.getRelationshipEdgeDirection() == IN) { fromVertex = attributeVertex; toVertex = entityVertex; @@ -1106,21 +1143,29 @@ public class EntityGraphMapper { } private static AtlasObjectId getObjectId(Object val) throws AtlasBaseException { + AtlasObjectId ret = null; + if (val != null) { if ( val instanceof AtlasObjectId) { - return ((AtlasObjectId) val); + ret = ((AtlasObjectId) val); } else if (val instanceof Map) { - AtlasObjectId ret = new AtlasObjectId((Map)val); + Map map = (Map) val; - if (AtlasTypeUtil.isValid(ret)) { - return ret; + if (map.containsKey(AtlasRelatedObjectId.KEY_RELATIONSHIP_TYPE)) { + ret = new AtlasRelatedObjectId(map); + } else { + ret = new AtlasObjectId((Map) val); } - } - throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, val.toString()); + if (!AtlasTypeUtil.isValid(ret)) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, val.toString()); + } + } else { + throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, val.toString()); + } } - return null; + return ret; } private static String getGuid(Object val) throws AtlasBaseException { diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index d16d544..d1d6003 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -35,6 +35,7 @@ import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.model.notification.HookNotification.EntityCreateRequestV2; import org.apache.atlas.model.notification.HookNotification.EntityDeleteRequestV2; @@ -56,12 +57,14 @@ import org.apache.atlas.repository.store.graph.v2.AtlasEntityStream; import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; import org.apache.atlas.service.Service; import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasStructType.AtlasAttribute; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.utils.AtlasPerfTracer; import org.apache.atlas.web.filters.AuditFilter; import org.apache.atlas.web.filters.AuditFilter.AuditLog; import org.apache.atlas.web.service.ServiceState; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.StringUtils; import org.apache.kafka.common.TopicPartition; @@ -75,6 +78,7 @@ import javax.inject.Inject; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -86,6 +90,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; +import static org.apache.atlas.model.instance.AtlasObjectId.*; +import static org.apache.atlas.notification.preprocessor.EntityPreprocessor.TYPE_HIVE_PROCESS; + /** * Consumer of notifications from hooks e.g., hive hook etc. @@ -174,7 +181,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl consumerRetryInterval = applicationProperties.getInt(CONSUMER_RETRY_INTERVAL, 500); minWaitDuration = applicationProperties.getInt(CONSUMER_MIN_RETRY_INTERVAL, consumerRetryInterval); // 500 ms by default maxWaitDuration = applicationProperties.getInt(CONSUMER_MAX_RETRY_INTERVAL, minWaitDuration * 60); // 30 sec by default - commitBatchSize = applicationProperties.getInt(CONSUMER_COMMIT_BATCH_SIZE, 0); + commitBatchSize = applicationProperties.getInt(CONSUMER_COMMIT_BATCH_SIZE, 50); skipHiveColumnLineageHive20633 = applicationProperties.getBoolean(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, false); skipHiveColumnLineageHive20633InputsThreshold = applicationProperties.getInt(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 15); // skip if avg # of inputs is > 15 @@ -216,8 +223,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl hiveTablesCache = Collections.emptyMap(); } - hiveTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS, false); - rdbmsTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS, false); + hiveTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS, true); + rdbmsTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS, true); preprocessEnabled = !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || skipHiveColumnLineageHive20633 || hiveTypesRemoveOwnedRefAttrs || rdbmsTypesRemoveOwnedRefAttrs; LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, skipHiveColumnLineageHive20633); @@ -704,6 +711,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl if (commitBatchSize <= 0 || entitiesList.size() <= commitBatchSize) { atlasEntityStore.createOrUpdate(entityStream, isPartialUpdate); } else { + Map<String, String> guidAssignments = new HashMap<>(); + for (int fromIdx = 0; fromIdx < entitiesList.size(); fromIdx += commitBatchSize) { int toIndex = fromIdx + commitBatchSize; @@ -711,10 +720,16 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl toIndex = entitiesList.size(); } - AtlasEntitiesWithExtInfo batch = new AtlasEntitiesWithExtInfo(new ArrayList<>(entitiesList.subList(fromIdx, toIndex))); + List<AtlasEntity> entitiesBatch = new ArrayList<>(entitiesList.subList(fromIdx, toIndex)); + + updateProcessedEntityReferences(entitiesBatch, guidAssignments); + + AtlasEntitiesWithExtInfo batch = new AtlasEntitiesWithExtInfo(entitiesBatch); AtlasEntityStream batchStream = new AtlasEntityStream(batch, entityStream); - atlasEntityStore.createOrUpdate(batchStream, isPartialUpdate); + EntityMutationResponse response = atlasEntityStore.createOrUpdate(batchStream, isPartialUpdate); + + recordProcessedEntities(response, guidAssignments); RequestContext.get().resetEntityGuidUpdates(); @@ -801,7 +816,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl PreprocessorContext context = new PreprocessorContext(kafkaMsg, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache, hiveTypesRemoveOwnedRefAttrs, rdbmsTypesRemoveOwnedRefAttrs); - if (!hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || hiveTypesRemoveOwnedRefAttrs) { + if (context.isHivePreprocessEnabled()) { preprocessHiveTypes(context); } @@ -814,6 +829,29 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } context.moveRegisteredReferredEntities(); + + if (context.isHivePreprocessEnabled() && CollectionUtils.isNotEmpty(context.getEntities())) { + // move hive_process and hive_column_lineage entities to end of the list + List<AtlasEntity> entities = context.getEntities(); + int count = entities.size(); + + for (int i = 0; i < count; i++) { + AtlasEntity entity = entities.get(i); + + switch (entity.getTypeName()) { + case TYPE_HIVE_PROCESS: + case TYPE_HIVE_COLUMN_LINEAGE: + entities.remove(i--); + entities.add(entity); + count--; + break; + } + } + + if (entities.size() - count > 0) { + LOG.info("moved {} hive_process/hive_column_lineage entities to end of list (listSize={})", entities.size() - count, entities.size()); + } + } } private void rdbmsTypeRemoveOwnedRefAttrs(PreprocessorContext context) { @@ -896,7 +934,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl if (lineageQNames.contains(qualifiedName)) { entities.remove(i--); - LOG.warn("removed duplicate hive_column_lineage entity: qualifiedName={}. topic-offset={}, partition={}", qualifiedName, lineageInputsCount, context.getKafkaMessageOffset(), context.getKafkaPartition()); + LOG.warn("removed duplicate hive_column_lineage entity: qualifiedName={}. topic-offset={}, partition={}", qualifiedName, context.getKafkaMessageOffset(), context.getKafkaPartition()); numRemovedEntities++; @@ -965,6 +1003,107 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl return ret; } + private void recordProcessedEntities(EntityMutationResponse mutationResponse, Map<String, String> guidAssignments) { + if (mutationResponse != null && MapUtils.isNotEmpty(mutationResponse.getGuidAssignments())) { + guidAssignments.putAll(mutationResponse.getGuidAssignments()); + + if (LOG.isDebugEnabled()) { + LOG.debug("recordProcessedEntities: added {} guidAssignments. updatedSize={}", mutationResponse.getGuidAssignments().size(), guidAssignments.size()); + } + } + } + + private void updateProcessedEntityReferences(List<AtlasEntity> entities, Map<String, String> guidAssignments) { + if (CollectionUtils.isNotEmpty(entities) && MapUtils.isNotEmpty(guidAssignments)) { + for (AtlasEntity entity : entities) { + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName()); + + if (entityType == null) { + continue; + } + + if (MapUtils.isNotEmpty(entity.getAttributes())) { + for (Map.Entry<String, Object> entry : entity.getAttributes().entrySet()) { + String attrName = entry.getKey(); + Object attrValue = entry.getValue(); + + if (attrValue == null) { + continue; + } + + AtlasAttribute attribute = entityType.getAttribute(attrName); + + if (attribute == null) { // look for a relationship attribute with the same name + attribute = entityType.getRelationshipAttribute(attrName, null); + } + + if (attribute != null && attribute.isObjectRef()) { + updateProcessedEntityReferences(attrValue, guidAssignments); + } + } + } + + if (MapUtils.isNotEmpty(entity.getRelationshipAttributes())) { + for (Map.Entry<String, Object> entry : entity.getRelationshipAttributes().entrySet()) { + Object attrValue = entry.getValue(); + + if (attrValue != null) { + updateProcessedEntityReferences(attrValue, guidAssignments); + } + } + } + } + } + } + + private void updateProcessedEntityReferences(Object objVal, Map<String, String> guidAssignments) { + if (objVal instanceof AtlasObjectId) { + updateProcessedEntityReferences((AtlasObjectId) objVal, guidAssignments); + } else if (objVal instanceof Collection) { + updateProcessedEntityReferences((Collection) objVal, guidAssignments); + } else if (objVal instanceof Map) { + updateProcessedEntityReferences((Map) objVal, guidAssignments); + } + } + + private void updateProcessedEntityReferences(AtlasObjectId objId, Map<String, String> guidAssignments) { + String guid = objId.getGuid(); + + if (guid != null && guidAssignments.containsKey(guid)) { + String assignedGuid = guidAssignments.get(guid); + + if (LOG.isDebugEnabled()) { + LOG.debug("{}(guid={}) is already processed; updating its reference to use assigned-guid={}", objId.getTypeName(), guid, assignedGuid); + } + + objId.setGuid(assignedGuid); + objId.setTypeName(null); + objId.setUniqueAttributes(null); + } + } + + private void updateProcessedEntityReferences(Map objId, Map<String, String> guidAssignments) { + Object guid = objId.get(KEY_GUID); + + if (guid != null && guidAssignments.containsKey(guid)) { + String assignedGuid = guidAssignments.get(guid); + + if (LOG.isDebugEnabled()) { + LOG.debug("{}(guid={}) is already processed; updating its reference to use assigned-guid={}", objId.get(KEY_TYPENAME), guid, assignedGuid); + } + + objId.put(KEY_GUID, assignedGuid); + objId.remove(KEY_TYPENAME); + objId.remove(KEY_UNIQUE_ATTRIBUTES); + } + } + + private void updateProcessedEntityReferences(Collection objIds, Map<String, String> guidAssignments) { + for (Object objId : objIds) { + updateProcessedEntityReferences(objId, guidAssignments); + } + } + static class FailedCommitOffsetRecorder { private Long currentOffset; diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java index ff9c9cb..9d6ad22 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java +++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java @@ -36,7 +36,7 @@ public class HivePreprocessor { private static final Logger LOG = LoggerFactory.getLogger(HivePreprocessor.class); private static final String RELATIONSHIP_TYPE_HIVE_TABLE_COLUMNS = "hive_table_columns"; - private static final String RELATIONSHIP_TYPE_HIVE_TABLE_PARTITION_KEYS = "hive_table_partitionKeys"; + private static final String RELATIONSHIP_TYPE_HIVE_TABLE_PARTITION_KEYS = "hive_table_partitionkeys"; static class HiveTablePreprocessor extends EntityPreprocessor { public HiveTablePreprocessor() { @@ -76,7 +76,7 @@ public class HivePreprocessor { } private void removeColumnsAttributeAndRegisterToMove(AtlasEntity entity, String attrName, String relationshipType, PreprocessorContext context) { - Object attrVal = entity.getAttribute(attrName); + Object attrVal = entity.removeAttribute(attrName); if (attrVal != null) { Set<String> guids = new HashSet<>(); diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java index 94e0993..c85c1b8 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java +++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java @@ -91,6 +91,10 @@ public class PreprocessorContext { public boolean getRdbmsTypesRemoveOwnedRefAttrs() { return rdbmsTypesRemoveOwnedRefAttrs; } + public boolean isHivePreprocessEnabled() { + return !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || hiveTypesRemoveOwnedRefAttrs; + } + public List<AtlasEntity> getEntities() { return entitiesWithExtInfo != null ? entitiesWithExtInfo.getEntities() : null; }