This is an automated email from the ASF dual-hosted git repository. madhan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push: new 3e4fb5c ATLAS-3054: updated notification pre-process to handle updates to ownedRef attributes - #3 3e4fb5c is described below commit 3e4fb5cd84262c6c675e434628e607d7afd518f4 Author: Madhan Neethiraj <mad...@apache.org> AuthorDate: Sun Mar 31 11:37:46 2019 -0700 ATLAS-3054: updated notification pre-process to handle updates to ownedRef attributes - #3 --- .../java/org/apache/atlas/type/AtlasArrayType.java | 344 ++++++++++++--------- .../org/apache/atlas/type/AtlasEntityType.java | 92 +++--- .../apache/atlas/type/AtlasRelationshipType.java | 11 +- .../org/apache/atlas/type/AtlasStructType.java | 1 + .../java/org/apache/atlas/type/AtlasTypeUtil.java | 27 ++ .../org/apache/atlas/type/TestAtlasStructType.java | 4 +- .../converters/AtlasStructFormatConverter.java | 2 +- .../store/graph/v2/AtlasEntityStream.java | 5 + .../notification/NotificationHookConsumer.java | 124 +++++--- .../preprocessor/EntityPreprocessor.java | 1 + .../preprocessor/HivePreprocessor.java | 63 +--- .../preprocessor/PreprocessorContext.java | 186 ++++++++++- .../preprocessor/RdbmsPreprocessor.java | 16 +- 13 files changed, 549 insertions(+), 327 deletions(-) diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasArrayType.java b/intg/src/main/java/org/apache/atlas/type/AtlasArrayType.java index 6147eee..656d946 100644 --- a/intg/src/main/java/org/apache/atlas/type/AtlasArrayType.java +++ b/intg/src/main/java/org/apache/atlas/type/AtlasArrayType.java @@ -21,12 +21,14 @@ package org.apache.atlas.type; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.TypeCategory; import org.apache.atlas.model.typedef.AtlasBaseTypeDef; +import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.reflect.Array; import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -43,33 +45,36 @@ public class AtlasArrayType extends AtlasType { private final String elementTypeName; private int minCount; private int maxCount; + private Cardinality cardinality; private AtlasType elementType; public AtlasArrayType(AtlasType elementType) { - this(elementType, COUNT_NOT_SET, COUNT_NOT_SET); + this(elementType, COUNT_NOT_SET, COUNT_NOT_SET, Cardinality.LIST); } - public AtlasArrayType(AtlasType elementType, int minCount, int maxCount) { + public AtlasArrayType(AtlasType elementType, int minCount, int maxCount, Cardinality cardinality) { super(AtlasBaseTypeDef.getArrayTypeName(elementType.getTypeName()), TypeCategory.ARRAY, SERVICE_TYPE_ATLAS_CORE); this.elementTypeName = elementType.getTypeName(); this.minCount = minCount; this.maxCount = maxCount; + this.cardinality = cardinality; this.elementType = elementType; } public AtlasArrayType(String elementTypeName, AtlasTypeRegistry typeRegistry) throws AtlasBaseException { - this(elementTypeName, COUNT_NOT_SET, COUNT_NOT_SET, typeRegistry); + this(elementTypeName, COUNT_NOT_SET, COUNT_NOT_SET, Cardinality.LIST, typeRegistry); } - public AtlasArrayType(String elementTypeName, int minCount, int maxCount, AtlasTypeRegistry typeRegistry) + public AtlasArrayType(String elementTypeName, int minCount, int maxCount, Cardinality cardinality, AtlasTypeRegistry typeRegistry) throws AtlasBaseException { super(AtlasBaseTypeDef.getArrayTypeName(elementTypeName), TypeCategory.ARRAY, SERVICE_TYPE_ATLAS_CORE); this.elementTypeName = elementTypeName; this.minCount = minCount; this.maxCount = maxCount; + this.cardinality = cardinality; this.resolveReferences(typeRegistry); } @@ -90,6 +95,10 @@ public class AtlasArrayType extends AtlasType { return maxCount; } + public void setCardinality(Cardinality cardinality) { this.cardinality = cardinality; } + + public Cardinality getCardinality() { return cardinality; } + public AtlasType getElementType() { return elementType; } @@ -151,79 +160,12 @@ public class AtlasArrayType extends AtlasType { @Override public boolean areEqualValues(Object val1, Object val2, Map<String, String> guidAssignments) { - boolean ret = true; + final boolean ret; - if (val1 == null) { - ret = val2 == null; - } else if (val2 == null) { - ret = false; + if (cardinality == Cardinality.SET) { + ret = areEqualSets(val1, val2, guidAssignments); } else { - if (val1.getClass().isArray() && val2.getClass().isArray()) { - int len = Array.getLength(val1); - - if (len != Array.getLength(val2)) { - ret = false; - } else { - for (int i = 0; i < len; i++) { - if (!elementType.areEqualValues(Array.get(val1, i), Array.get(val2, i), guidAssignments)) { - ret = false; - - break; - } - } - } - } else if ((val1 instanceof Set) && (val2 instanceof Set)) { - Set set1 = (Set) val1; - Set set2 = (Set) val2; - - if (set1.size() != set2.size()) { - ret = false; - } else { - for (Object elem1 : set1) { - boolean foundInSet2 = false; - - for (Object elem2 : set2) { - if (elementType.areEqualValues(elem1, elem2, guidAssignments)) { - foundInSet2 = true; - - break; - } - } - - if (!foundInSet2) { - ret = false; - - break; - } - } - } - } else { - List list1 = getListFromValue(val1); - - if (list1 == null) { - ret = false; - } else { - List list2 = getListFromValue(val2); - - if (list2 == null) { - ret = false; - } else { - int len = list1.size(); - - if (len != list2.size()) { - ret = false; - } else { - for (int i = 0; i < len; i++) { - if (!elementType.areEqualValues(list1.get(i), list2.get(i), guidAssignments)) { - ret = false; - - break; - } - } - } - } - } - } + ret = areEqualLists(val1, val2, guidAssignments); } return ret; @@ -266,128 +208,120 @@ public class AtlasArrayType extends AtlasType { @Override public Collection<?> getNormalizedValue(Object obj) { - if (obj == null) { - return null; - } + Collection<Object> ret = null; - if (obj instanceof String){ - obj = AtlasType.fromJson(obj.toString(), List.class); + if (obj instanceof String) { + obj = AtlasType.fromJson(obj.toString(), List.class); } if (obj instanceof List || obj instanceof Set) { - List<Object> ret = new ArrayList<>(); + Collection collObj = (Collection) obj; - Collection objList = (Collection) obj; + if (isValidElementCount(collObj.size())) { + ret = new ArrayList<>(collObj.size()); - if (!isValidElementCount(objList.size())) { - return null; - } + for (Object element : collObj) { + if (element != null) { + Object normalizedValue = elementType.getNormalizedValue(element); - for (Object element : objList) { - if (element != null) { - Object normalizedValue = elementType.getNormalizedValue(element); + if (normalizedValue != null) { + ret.add(normalizedValue); + } else { + ret = null; // invalid element value - if (normalizedValue != null) { - ret.add(normalizedValue); + break; + } } else { - return null; // invalid element value + ret.add(element); } - } else { - ret.add(element); } } - - return ret; - } else if (obj.getClass().isArray()) { - List<Object> ret = new ArrayList<>(); - + } else if (obj != null && obj.getClass().isArray()) { int arrayLen = Array.getLength(obj); - if (!isValidElementCount(arrayLen)) { - return null; - } + if (isValidElementCount(arrayLen)) { + ret = new ArrayList<>(arrayLen); - for (int i = 0; i < arrayLen; i++) { - Object element = Array.get(obj, i); + for (int i = 0; i < arrayLen; i++) { + Object element = Array.get(obj, i); - if (element != null) { - Object normalizedValue = elementType.getNormalizedValue(element); + if (element != null) { + Object normalizedValue = elementType.getNormalizedValue(element); + + if (normalizedValue != null) { + ret.add(normalizedValue); + } else { + ret = null; // invalid element value - if (normalizedValue != null) { - ret.add(normalizedValue); + break; + } } else { - return null; // invalid element value + ret.add(element); } - } else { - ret.add(element); } } - - return ret; } - return null; + return ret; } @Override public Collection<?> getNormalizedValueForUpdate(Object obj) { - if (obj == null) { - return null; + Collection<Object> ret = null; + + if (obj instanceof String) { + obj = AtlasType.fromJson(obj.toString(), List.class); } if (obj instanceof List || obj instanceof Set) { - List<Object> ret = new ArrayList<>(); - Collection objList = (Collection) obj; - if (!isValidElementCount(objList.size())) { - return null; - } + if (isValidElementCount(objList.size())) { + ret = new ArrayList<>(objList.size()); - for (Object element : objList) { - if (element != null) { - Object normalizedValue = elementType.getNormalizedValueForUpdate(element); + for (Object element : objList) { + if (element != null) { + Object normalizedValue = elementType.getNormalizedValueForUpdate(element); - if (normalizedValue != null) { - ret.add(normalizedValue); + if (normalizedValue != null) { + ret.add(normalizedValue); + } else { + ret = null; // invalid element value + + break; + } } else { - return null; // invalid element value + ret.add(element); } - } else { - ret.add(element); } } - - return ret; - } else if (obj.getClass().isArray()) { - List<Object> ret = new ArrayList<>(); - + } else if (obj != null && obj.getClass().isArray()) { int arrayLen = Array.getLength(obj); - if (!isValidElementCount(arrayLen)) { - return null; - } + if (isValidElementCount(arrayLen)) { + ret = new ArrayList<>(arrayLen); - for (int i = 0; i < arrayLen; i++) { - Object element = Array.get(obj, i); + for (int i = 0; i < arrayLen; i++) { + Object element = Array.get(obj, i); - if (element != null) { - Object normalizedValue = elementType.getNormalizedValueForUpdate(element); + if (element != null) { + Object normalizedValue = elementType.getNormalizedValueForUpdate(element); + + if (normalizedValue != null) { + ret.add(normalizedValue); + } else { + ret = null; // invalid element value - if (normalizedValue != null) { - ret.add(normalizedValue); + break; + } } else { - return null; // invalid element value + ret.add(element); } - } else { - ret.add(element); } } - - return ret; } - return null; + return ret; } @Override @@ -483,7 +417,7 @@ public class AtlasArrayType extends AtlasType { if (elementAttributeType == elementType) { return this; } else { - AtlasType attributeType = new AtlasArrayType(elementAttributeType, minCount, maxCount); + AtlasType attributeType = new AtlasArrayType(elementAttributeType, minCount, maxCount, cardinality); if (LOG.isDebugEnabled()) { LOG.debug("getTypeForAttribute(): {} ==> {}", getTypeName(), attributeType.getTypeName()); @@ -523,29 +457,131 @@ public class AtlasArrayType extends AtlasType { return false; } + private boolean areEqualSets(Object val1, Object val2, Map<String, String> guidAssignments) { + boolean ret = true; + + if (val1 == null) { + ret = val2 == null; + } else if (val2 == null) { + ret = false; + } else if (val1 == val2) { + ret = true; + } else { + Set set1 = getSetFromValue(val1); + Set set2 = getSetFromValue(val2); + + if (set1.size() != set2.size()) { + ret = false; + } else { + for (Object elem1 : set1) { + boolean foundInSet2 = false; + + for (Object elem2 : set2) { + if (elementType.areEqualValues(elem1, elem2, guidAssignments)) { + foundInSet2 = true; + + break; + } + } + + if (!foundInSet2) { + ret = false; + + break; + } + } + } + } + + return ret; + } + + private boolean areEqualLists(Object val1, Object val2, Map<String, String> guidAssignments) { + boolean ret = true; + + if (val1 == null) { + ret = val2 == null; + } else if (val2 == null) { + ret = false; + } else if (val1 == val2) { + ret = true; + } else if (val1.getClass().isArray() && val2.getClass().isArray()) { + int len = Array.getLength(val1); + + if (len != Array.getLength(val2)) { + ret = false; + } else { + for (int i = 0; i < len; i++) { + if (!elementType.areEqualValues(Array.get(val1, i), Array.get(val2, i), guidAssignments)) { + ret = false; + + break; + } + } + } + } else { + List list1 = getListFromValue(val1); + List list2 = getListFromValue(val2); + + if (list1.size() != list2.size()) { + ret = false; + } else { + int len = list1.size(); + + for (int i = 0; i < len; i++) { + if (!elementType.areEqualValues(list1.get(i), list2.get(i), guidAssignments)) { + ret = false; + + break; + } + } + } + } + + return ret; + } + private List getListFromValue(Object val) { final List ret; if (val instanceof List) { ret = (List) val; } else if (val instanceof Collection) { - int len = ((Collection) val).size(); + ret = new ArrayList<>((Collection) val); + } else if (val.getClass().isArray()) { + int len = Array.getLength(val); ret = new ArrayList<>(len); - for (Object elem : ((Collection) val)) { - ret.add(elem); + for (int i = 0; i < len; i++) { + ret.add(Array.get(val, i)); } + } else if (val instanceof String){ + ret = AtlasType.fromJson(val.toString(), List.class); + } else { + ret = null; + } + + return ret; + } + + private Set getSetFromValue(Object val) { + final Set ret; + + if (val instanceof Set) { + ret = (Set) val; + } else if (val instanceof Collection) { + ret = new HashSet<>((Collection) val); } else if (val.getClass().isArray()) { int len = Array.getLength(val); - ret = new ArrayList<>(len); + ret = new HashSet<>(len); for (int i = 0; i < len; i++) { ret.add(Array.get(val, i)); } } else if (val instanceof String){ - ret = AtlasType.fromJson(val.toString(), List.class); + ret = AtlasType.fromJson(val.toString(), Set.class); } else { ret = null; } diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java b/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java index 1ce776e..d9ae9e3 100644 --- a/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java +++ b/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java @@ -22,7 +22,6 @@ import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasObjectId; -import org.apache.atlas.model.instance.AtlasStruct; import org.apache.atlas.model.typedef.AtlasEntityDef; import org.apache.atlas.model.typedef.AtlasEntityDef.AtlasRelationshipAttributeDef; import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; @@ -544,7 +543,9 @@ public class AtlasEntityType extends AtlasStructType { superType.normalizeAttributeValues(ent); } - normalizeValues(ent); + super.normalizeAttributeValues(ent); + + normalizeRelationshipAttributeValues(ent, false); } } @@ -555,6 +556,8 @@ public class AtlasEntityType extends AtlasStructType { } super.normalizeAttributeValuesForUpdate(ent); + + normalizeRelationshipAttributeValues(ent, true); } } @@ -565,7 +568,9 @@ public class AtlasEntityType extends AtlasStructType { superType.normalizeAttributeValues(obj); } - normalizeValues(obj); + super.normalizeAttributeValues(obj); + + normalizeRelationshipAttributeValues(obj, false); } } @@ -576,6 +581,8 @@ public class AtlasEntityType extends AtlasStructType { } super.normalizeAttributeValuesForUpdate(obj); + + normalizeRelationshipAttributeValues(obj, true); } } @@ -743,67 +750,56 @@ public class AtlasEntityType extends AtlasStructType { return ret; } - private void normalizeRelationshipAttributeValues(AtlasStruct obj) { - if (obj != null && obj instanceof AtlasEntity) { - AtlasEntity entityObj = (AtlasEntity) obj; - + private void normalizeRelationshipAttributeValues(AtlasEntity entity, boolean isUpdate) { + if (entity != null) { for (String attributeName : relationshipAttributes.keySet()) { - if (entityObj.hasRelationshipAttribute(attributeName)) { - Object attributeValue = entityObj.getRelationshipAttribute(attributeName); - String relationshipType = AtlasEntityUtil.getRelationshipType(attributeValue); - AtlasAttribute attribute = getRelationshipAttribute(attributeName, relationshipType); - AtlasAttributeDef attributeDef = attribute.getAttributeDef(); + if (entity.hasRelationshipAttribute(attributeName)) { + Object attributeValue = entity.getRelationshipAttribute(attributeName); + String relationshipType = AtlasEntityUtil.getRelationshipType(attributeValue); + AtlasAttribute attribute = getRelationshipAttribute(attributeName, relationshipType); - attributeValue = getNormalizedValue(attributeValue, attributeDef); + if (attribute != null) { + AtlasType attrType = attribute.getAttributeType(); + + if (isValidRelationshipType(attrType)) { + if (isUpdate) { + attributeValue = attrType.getNormalizedValueForUpdate(attributeValue); + } else { + attributeValue = attrType.getNormalizedValue(attributeValue); + } - entityObj.setRelationshipAttribute(attributeName, attributeValue); + entity.setRelationshipAttribute(attributeName, attributeValue); + } + } } } } } - public void normalizeRelationshipAttributeValues(Map<String, Object> obj) { + public void normalizeRelationshipAttributeValues(Map<String, Object> obj, boolean isUpdate) { if (obj != null) { for (String attributeName : relationshipAttributes.keySet()) { if (obj.containsKey(attributeName)) { - Object attributeValue = obj.get(attributeName); - String relationshipType = AtlasEntityUtil.getRelationshipType(attributeValue); - AtlasAttribute attribute = getRelationshipAttribute(attributeName, relationshipType); - AtlasAttributeDef attributeDef = attribute.getAttributeDef(); - - attributeValue = getNormalizedValue(attributeValue, attributeDef); - - obj.put(attributeName, attributeValue); - } - } - } - } + Object attributeValue = obj.get(attributeName); + String relationshipType = AtlasEntityUtil.getRelationshipType(attributeValue); + AtlasAttribute attribute = getRelationshipAttribute(attributeName, relationshipType); - private Object getNormalizedValue(Object value, AtlasAttributeDef attributeDef) { - String relationshipType = AtlasEntityUtil.getRelationshipType(value); - AtlasAttribute attribute = getRelationshipAttribute(attributeDef.getName(), relationshipType); + if (attribute != null) { + AtlasType attrType = attribute.getAttributeType(); - if (attribute != null) { - AtlasType attrType = attribute.getAttributeType(); + if (isValidRelationshipType(attrType)) { + if (isUpdate) { + attributeValue = attrType.getNormalizedValueForUpdate(attributeValue); + } else { + attributeValue = attrType.getNormalizedValue(attributeValue); + } - if (isValidRelationshipType(attrType) && value != null) { - return attrType.getNormalizedValue(value); + obj.put(attributeName, attributeValue); + } + } + } } } - - return null; - } - - private void normalizeValues(AtlasEntity ent) { - super.normalizeAttributeValues(ent); - - normalizeRelationshipAttributeValues(ent); - } - - private void normalizeValues(Map<String, Object> obj) { - super.normalizeAttributeValues(obj); - - normalizeRelationshipAttributeValues(obj); } private boolean validateRelationshipAttributes(Object obj, String objName, List<String> messages) { diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasRelationshipType.java b/intg/src/main/java/org/apache/atlas/type/AtlasRelationshipType.java index 585d176..98071b2 100644 --- a/intg/src/main/java/org/apache/atlas/type/AtlasRelationshipType.java +++ b/intg/src/main/java/org/apache/atlas/type/AtlasRelationshipType.java @@ -346,8 +346,15 @@ public class AtlasRelationshipType extends AtlasStructType { attributeDef.addConstraint(constraint); } - attribute = new AtlasAttribute(entityType, attributeDef, - typeRegistry.getType(attrTypeName), getTypeName(), relationshipLabel); + AtlasType attrType = typeRegistry.getType(attrTypeName); + + if (attrType instanceof AtlasArrayType) { + AtlasArrayType arrayType = (AtlasArrayType) attrType; + + arrayType.setCardinality(attributeDef.getCardinality()); + } + + attribute = new AtlasAttribute(entityType, attributeDef, attrType, getTypeName(), relationshipLabel); attribute.setLegacyAttribute(endDef.getIsLegacyAttribute()); } else { diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasStructType.java b/intg/src/main/java/org/apache/atlas/type/AtlasStructType.java index fb24df0..31953bd 100644 --- a/intg/src/main/java/org/apache/atlas/type/AtlasStructType.java +++ b/intg/src/main/java/org/apache/atlas/type/AtlasStructType.java @@ -99,6 +99,7 @@ public class AtlasStructType extends AtlasType { arrayType.setMinCount(attributeDef.getValuesMinCount()); arrayType.setMaxCount(attributeDef.getValuesMaxCount()); + arrayType.setCardinality(cardinality); } //check if attribute type is not classification diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java b/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java index 079a8fc..d74c7e3 100644 --- a/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java +++ b/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java @@ -38,6 +38,7 @@ import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinali import org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef; import org.apache.atlas.model.typedef.AtlasTypeDefHeader; import org.apache.atlas.model.typedef.AtlasTypesDef; +import org.apache.atlas.type.AtlasStructType.AtlasAttribute; import org.apache.atlas.v1.model.typedef.AttributeDefinition; import org.apache.atlas.v1.model.typedef.ClassTypeDefinition; import org.apache.atlas.v1.model.typedef.Multiplicity; @@ -413,10 +414,36 @@ public class AtlasTypeUtil { return new AtlasRelatedObjectId(getAtlasObjectId(entity)); } + public static AtlasRelatedObjectId toAtlasRelatedObjectId(AtlasEntity entity, AtlasTypeRegistry typeRegistry) { + return new AtlasRelatedObjectId(getAtlasObjectId(entity, typeRegistry)); + } + public static AtlasObjectId getAtlasObjectId(AtlasEntity entity) { return new AtlasObjectId(entity.getGuid(), entity.getTypeName()); } + public static AtlasObjectId getAtlasObjectId(AtlasEntity entity, AtlasTypeRegistry typeRegistry) { + String typeName = entity.getTypeName(); + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName); + Map<String, Object> uniqAttributes = null; + + if (entityType != null && MapUtils.isNotEmpty(entityType.getUniqAttributes())) { + for (AtlasAttribute attribute : entityType.getUniqAttributes().values()) { + Object attrValue = entity.getAttribute(attribute.getName()); + + if (attrValue != null) { + if (uniqAttributes == null) { + uniqAttributes = new HashMap<>(); + } + + uniqAttributes.put(attribute.getName(), attrValue); + } + } + } + + return new AtlasObjectId(entity.getGuid(), typeName, uniqAttributes); + } + public static AtlasObjectId getAtlasObjectId(AtlasEntityHeader header) { return new AtlasObjectId(header.getGuid(), header.getTypeName()); } diff --git a/intg/src/test/java/org/apache/atlas/type/TestAtlasStructType.java b/intg/src/test/java/org/apache/atlas/type/TestAtlasStructType.java index a37dd46..f117fb3 100644 --- a/intg/src/test/java/org/apache/atlas/type/TestAtlasStructType.java +++ b/intg/src/test/java/org/apache/atlas/type/TestAtlasStructType.java @@ -63,12 +63,12 @@ public class TestAtlasStructType { multiValuedAttribMin.setName(MULTI_VAL_ATTR_NAME_MIN); multiValuedAttribMin.setTypeName(AtlasBaseTypeDef.getArrayTypeName(ATLAS_TYPE_INT)); - multiValuedAttribMin.setCardinality(Cardinality.SET); + multiValuedAttribMin.setCardinality(Cardinality.LIST); multiValuedAttribMin.setValuesMinCount(MULTI_VAL_ATTR_MIN_COUNT); multiValuedAttribMax.setName(MULTI_VAL_ATTR_NAME_MAX); multiValuedAttribMax.setTypeName(AtlasBaseTypeDef.getArrayTypeName(ATLAS_TYPE_INT)); - multiValuedAttribMax.setCardinality(Cardinality.LIST); + multiValuedAttribMax.setCardinality(Cardinality.SET); multiValuedAttribMax.setValuesMaxCount(MULTI_VAL_ATTR_MAX_COUNT); AtlasStructDef structDef = ModelTestUtil.newStructDef(); diff --git a/repository/src/main/java/org/apache/atlas/repository/converters/AtlasStructFormatConverter.java b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasStructFormatConverter.java index 173fcee..ae92b8b 100644 --- a/repository/src/main/java/org/apache/atlas/repository/converters/AtlasStructFormatConverter.java +++ b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasStructFormatConverter.java @@ -190,7 +190,7 @@ public class AtlasStructFormatConverter extends AtlasAbstractFormatConverter { if (entities != null) { v2Value = entities; - attrType = new AtlasArrayType(entityType); + attrType = new AtlasArrayType(entityType, arrayType.getMinCount(), arrayType.getMaxCount(), arrayType.getCardinality()); if (LOG.isDebugEnabled()) { LOG.debug("{}: replaced objIdList with entityList", attr.getQualifiedName()); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStream.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStream.java index d12b036..c823b20 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStream.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStream.java @@ -22,6 +22,7 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import java.util.Iterator; +import java.util.List; public class AtlasEntityStream implements EntityStream { protected final AtlasEntitiesWithExtInfo entitiesWithExtInfo; @@ -33,6 +34,10 @@ public class AtlasEntityStream implements EntityStream { this(new AtlasEntitiesWithExtInfo(entity), null); } + public AtlasEntityStream(List<AtlasEntity> entities) { + this(new AtlasEntitiesWithExtInfo(entities), null); + } + public AtlasEntityStream(AtlasEntityWithExtInfo entityWithExtInfo) { this(new AtlasEntitiesWithExtInfo(entityWithExtInfo), null); } diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 48355c9..8430fd4 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -34,6 +34,7 @@ import org.apache.atlas.listener.ActiveStateChangeHandler; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; +import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.notification.HookNotification; @@ -79,7 +80,6 @@ import javax.inject.Inject; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -492,7 +492,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl return; } - preProcessNotificationMessage(kafkaMsg); + PreprocessorContext context = preProcessNotificationMessage(kafkaMsg); if (isEmptyMessage(kafkaMsg)) { commit(kafkaMsg); @@ -525,7 +525,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl AtlasClient.API_V1.CREATE_ENTITY.getNormalizedPath()); } - createOrUpdate(entities, false); + createOrUpdate(entities, false, context); } break; @@ -546,7 +546,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl // There should only be one root entity entities.getEntities().get(0).setGuid(guid); - createOrUpdate(entities, true); + createOrUpdate(entities, true, context); } break; @@ -579,7 +579,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl AtlasClientV2.API_V2.UPDATE_ENTITY.getNormalizedPath()); } - createOrUpdate(entities, false); + createOrUpdate(entities, false, context); } break; @@ -593,7 +593,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl AtlasClientV2.API_V2.CREATE_ENTITY.getNormalizedPath()); } - createOrUpdate(entities, false); + createOrUpdate(entities, false, context); } break; @@ -622,7 +622,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl AtlasClientV2.API_V2.UPDATE_ENTITY.getNormalizedPath()); } - createOrUpdate(entities, false); + createOrUpdate(entities, false, context); } break; @@ -708,15 +708,15 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } } - private void createOrUpdate(AtlasEntitiesWithExtInfo entities, boolean isPartialUpdate) throws AtlasBaseException { + private void createOrUpdate(AtlasEntitiesWithExtInfo entities, boolean isPartialUpdate, PreprocessorContext context) throws AtlasBaseException { List<AtlasEntity> entitiesList = entities.getEntities(); AtlasEntityStream entityStream = new AtlasEntityStream(entities); if (commitBatchSize <= 0 || entitiesList.size() <= commitBatchSize) { - atlasEntityStore.createOrUpdate(entityStream, isPartialUpdate); - } else { - Map<String, String> guidAssignments = new HashMap<>(); + EntityMutationResponse response = atlasEntityStore.createOrUpdate(entityStream, isPartialUpdate); + recordProcessedEntities(response, context); + } else { for (int fromIdx = 0; fromIdx < entitiesList.size(); fromIdx += commitBatchSize) { int toIndex = fromIdx + commitBatchSize; @@ -726,20 +726,30 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl List<AtlasEntity> entitiesBatch = new ArrayList<>(entitiesList.subList(fromIdx, toIndex)); - updateProcessedEntityReferences(entitiesBatch, guidAssignments); + updateProcessedEntityReferences(entitiesBatch, context.getGuidAssignments()); AtlasEntitiesWithExtInfo batch = new AtlasEntitiesWithExtInfo(entitiesBatch); AtlasEntityStream batchStream = new AtlasEntityStream(batch, entityStream); EntityMutationResponse response = atlasEntityStore.createOrUpdate(batchStream, isPartialUpdate); - recordProcessedEntities(response, guidAssignments); + recordProcessedEntities(response, context); RequestContext.get().resetEntityGuidUpdates(); RequestContext.get().clearCache(); } } + + if (context != null) { + context.prepareForPostUpdate(); + + List<AtlasEntity> postUpdateEntities = context.getPostUpdateEntities(); + + if (CollectionUtils.isNotEmpty(postUpdateEntities)) { + atlasEntityStore.createOrUpdate(new AtlasEntityStream(postUpdateEntities), true); + } + } } private void recordFailedMessages() { @@ -815,49 +825,51 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } } - private void preProcessNotificationMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) { - if (!preprocessEnabled) { - return; - } + private PreprocessorContext preProcessNotificationMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) { + PreprocessorContext context = null; - PreprocessorContext context = new PreprocessorContext(kafkaMsg, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache, hiveTypesRemoveOwnedRefAttrs, rdbmsTypesRemoveOwnedRefAttrs); + if (preprocessEnabled) { + context = new PreprocessorContext(kafkaMsg, typeRegistry, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache, hiveTypesRemoveOwnedRefAttrs, rdbmsTypesRemoveOwnedRefAttrs); - if (context.isHivePreprocessEnabled()) { - preprocessHiveTypes(context); - } + if (context.isHivePreprocessEnabled()) { + preprocessHiveTypes(context); + } - if (skipHiveColumnLineageHive20633) { - skipHiveColumnLineage(context); - } + if (skipHiveColumnLineageHive20633) { + skipHiveColumnLineage(context); + } - if (rdbmsTypesRemoveOwnedRefAttrs) { - rdbmsTypeRemoveOwnedRefAttrs(context); - } + if (rdbmsTypesRemoveOwnedRefAttrs) { + rdbmsTypeRemoveOwnedRefAttrs(context); + } - context.moveRegisteredReferredEntities(); + context.moveRegisteredReferredEntities(); - if (context.isHivePreprocessEnabled() && CollectionUtils.isNotEmpty(context.getEntities())) { - // move hive_process and hive_column_lineage entities to end of the list - List<AtlasEntity> entities = context.getEntities(); - int count = entities.size(); + if (context.isHivePreprocessEnabled() && CollectionUtils.isNotEmpty(context.getEntities())) { + // move hive_process and hive_column_lineage entities to end of the list + List<AtlasEntity> entities = context.getEntities(); + int count = entities.size(); - for (int i = 0; i < count; i++) { - AtlasEntity entity = entities.get(i); + for (int i = 0; i < count; i++) { + AtlasEntity entity = entities.get(i); - switch (entity.getTypeName()) { - case TYPE_HIVE_PROCESS: - case TYPE_HIVE_COLUMN_LINEAGE: - entities.remove(i--); - entities.add(entity); - count--; - break; + switch (entity.getTypeName()) { + case TYPE_HIVE_PROCESS: + case TYPE_HIVE_COLUMN_LINEAGE: + entities.remove(i--); + entities.add(entity); + count--; + break; + } } - } - if (entities.size() - count > 0) { - LOG.info("moved {} hive_process/hive_column_lineage entities to end of list (listSize={})", entities.size() - count, entities.size()); + if (entities.size() - count > 0) { + LOG.info("moved {} hive_process/hive_column_lineage entities to end of list (listSize={})", entities.size() - count, entities.size()); + } } } + + return context; } private void rdbmsTypeRemoveOwnedRefAttrs(PreprocessorContext context) { @@ -1009,12 +1021,26 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl return ret; } - private void recordProcessedEntities(EntityMutationResponse mutationResponse, Map<String, String> guidAssignments) { - if (mutationResponse != null && MapUtils.isNotEmpty(mutationResponse.getGuidAssignments())) { - guidAssignments.putAll(mutationResponse.getGuidAssignments()); + private void recordProcessedEntities(EntityMutationResponse mutationResponse, PreprocessorContext context) { + if (mutationResponse != null && context != null) { + if (MapUtils.isNotEmpty(mutationResponse.getGuidAssignments())) { + context.getGuidAssignments().putAll(mutationResponse.getGuidAssignments()); + } - if (LOG.isDebugEnabled()) { - LOG.debug("recordProcessedEntities: added {} guidAssignments. updatedSize={}", mutationResponse.getGuidAssignments().size(), guidAssignments.size()); + if (CollectionUtils.isNotEmpty(mutationResponse.getCreatedEntities())) { + for (AtlasEntityHeader entity : mutationResponse.getCreatedEntities()) { + if (entity != null && entity.getGuid() != null) { + context.getCreatedEntities().add(entity.getGuid()); + } + } + } + + if (CollectionUtils.isNotEmpty(mutationResponse.getDeletedEntities())) { + for (AtlasEntityHeader entity : mutationResponse.getDeletedEntities()) { + if (entity != null && entity.getGuid() != null) { + context.getDeletedEntities().add(entity.getGuid()); + } + } } } } diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java index 085e746..9b620dd 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java +++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java @@ -53,6 +53,7 @@ public abstract class EntityPreprocessor { public static final String ATTRIBUTE_TABLES = "tables"; public static final String ATTRIBUTE_INDEXES = "indexes"; public static final String ATTRIBUTE_FOREIGN_KEYS = "foreign_keys"; + public static final String ATTRIBUTE_INSTANCE = "instance"; public static final char QNAME_SEP_CLUSTER_NAME = '@'; public static final char QNAME_SEP_ENTITY_NAME = '.'; diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java index 0b93658..cc31032 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java +++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java @@ -18,8 +18,6 @@ package org.apache.atlas.notification.preprocessor; import org.apache.atlas.model.instance.AtlasEntity; -import org.apache.atlas.model.instance.AtlasObjectId; -import org.apache.atlas.model.instance.AtlasRelatedObjectId; import org.apache.atlas.notification.preprocessor.PreprocessorContext.PreprocessAction; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; @@ -27,16 +25,14 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collection; -import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.Set; public class HivePreprocessor { private static final Logger LOG = LoggerFactory.getLogger(HivePreprocessor.class); private static final String RELATIONSHIP_TYPE_HIVE_TABLE_COLUMNS = "hive_table_columns"; private static final String RELATIONSHIP_TYPE_HIVE_TABLE_PARTITION_KEYS = "hive_table_partitionkeys"; + private static final String RELATIONSHIP_TYPE_HIVE_TABLE_STORAGEDESC = "hive_table_storagedesc"; static class HiveTablePreprocessor extends EntityPreprocessor { public HiveTablePreprocessor() { @@ -67,63 +63,12 @@ public class HivePreprocessor { entity.setAttribute(ATTRIBUTE_COLUMNS, null); entity.setAttribute(ATTRIBUTE_PARTITION_KEYS, null); } else if (context.getHiveTypesRemoveOwnedRefAttrs()) { - context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_SD); - - removeColumnsAttributeAndRegisterToMove(entity, ATTRIBUTE_COLUMNS, RELATIONSHIP_TYPE_HIVE_TABLE_COLUMNS, context); - removeColumnsAttributeAndRegisterToMove(entity, ATTRIBUTE_PARTITION_KEYS, RELATIONSHIP_TYPE_HIVE_TABLE_PARTITION_KEYS, context); + context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_SD, RELATIONSHIP_TYPE_HIVE_TABLE_STORAGEDESC, ATTRIBUTE_TABLE); + context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_COLUMNS, RELATIONSHIP_TYPE_HIVE_TABLE_COLUMNS, ATTRIBUTE_TABLE); + context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_PARTITION_KEYS, RELATIONSHIP_TYPE_HIVE_TABLE_PARTITION_KEYS, ATTRIBUTE_TABLE); } } } - - private void removeColumnsAttributeAndRegisterToMove(AtlasEntity entity, String attrName, String relationshipType, PreprocessorContext context) { - Object attrVal = entity.removeAttribute(attrName); - - if (attrVal != null) { - Set<String> guids = new HashSet<>(); - - context.collectGuids(attrVal, guids); - - for (String guid : guids) { - AtlasEntity colEntity = context.getEntity(guid); - - if (colEntity != null) { - Object attrTable = null; - - if (colEntity.hasRelationshipAttribute(ATTRIBUTE_TABLE)) { - attrTable = colEntity.getRelationshipAttribute(ATTRIBUTE_TABLE); - } else if (colEntity.hasAttribute(ATTRIBUTE_TABLE)) { - attrTable = colEntity.getAttribute(ATTRIBUTE_TABLE); - } - - attrTable = setRelationshipType(attrTable, relationshipType); - - if (attrTable != null) { - colEntity.setRelationshipAttribute(ATTRIBUTE_TABLE, attrTable); - } - - context.addToReferredEntitiesToMove(guid); - } - } - } - } - - private AtlasRelatedObjectId setRelationshipType(Object attr, String relationshipType) { - AtlasRelatedObjectId ret = null; - - if (attr instanceof AtlasRelatedObjectId) { - ret = (AtlasRelatedObjectId) attr; - } else if (attr instanceof AtlasObjectId) { - ret = new AtlasRelatedObjectId((AtlasObjectId) attr); - } else if (attr instanceof Map) { - ret = new AtlasRelatedObjectId((Map) attr); - } - - if (ret != null) { - ret.setRelationshipType(relationshipType); - } - - return ret; - } } diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java index c85c1b8..2db0574 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java +++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java @@ -21,18 +21,27 @@ import org.apache.atlas.kafka.AtlasKafkaMessage; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.model.instance.AtlasRelatedObjectId; import org.apache.atlas.model.notification.HookNotification; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.type.AtlasTypeUtil; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.ListIterator; import java.util.Map; import java.util.Set; import java.util.regex.Pattern; +import static org.apache.atlas.model.instance.AtlasObjectId.KEY_GUID; + public class PreprocessorContext { private static final Logger LOG = LoggerFactory.getLogger(PreprocessorContext.class); @@ -40,6 +49,7 @@ public class PreprocessorContext { public enum PreprocessAction { NONE, IGNORE, PRUNE } private final AtlasKafkaMessage<HookNotification> kafkaMessage; + private final AtlasTypeRegistry typeRegistry; private final AtlasEntitiesWithExtInfo entitiesWithExtInfo; private final List<Pattern> hiveTablesToIgnore; private final List<Pattern> hiveTablesToPrune; @@ -49,9 +59,14 @@ public class PreprocessorContext { private final Set<String> ignoredEntities = new HashSet<>(); private final Set<String> prunedEntities = new HashSet<>(); private final Set<String> referredEntitiesToMove = new HashSet<>(); + private final Set<String> createdEntities = new HashSet<>(); + private final Set<String> deletedEntities = new HashSet<>(); + private final Map<String, String> guidAssignments = new HashMap<>(); + private List<AtlasEntity> postUpdateEntities = null; - public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, boolean hiveTypesRemoveOwnedRefAttrs, boolean rdbmsTypesRemoveOwnedRefAttrs) { + public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, AtlasTypeRegistry typeRegistry, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, boolean hiveTypesRemoveOwnedRefAttrs, boolean rdbmsTypesRemoveOwnedRefAttrs) { this.kafkaMessage = kafkaMessage; + this.typeRegistry = typeRegistry; this.hiveTablesToIgnore = hiveTablesToIgnore; this.hiveTablesToPrune = hiveTablesToPrune; this.hiveTablesCache = hiveTablesCache; @@ -119,6 +134,14 @@ public class PreprocessorContext { public Set<String> getReferredEntitiesToMove() { return referredEntitiesToMove; } + public Set<String> getCreatedEntities() { return createdEntities; } + + public Set<String> getDeletedEntities() { return deletedEntities; } + + public Map<String, String> getGuidAssignments() { return guidAssignments; } + + public List<AtlasEntity> getPostUpdateEntities() { return postUpdateEntities; } + public PreprocessAction getPreprocessActionForHiveTable(String qualifiedName) { PreprocessAction ret = PreprocessAction.NONE; @@ -199,12 +222,48 @@ public class PreprocessorContext { collectGuids(obj, prunedEntities); } - public void removeRefAttributeAndRegisterToMove(AtlasEntity entity, String attrName) { - Set<String> guidsToMove = new HashSet<>(); + public void removeRefAttributeAndRegisterToMove(AtlasEntity entity, String attrName, String relationshipType, String refAttrName) { + Object attrVal = entity.removeAttribute(attrName); + + if (attrVal != null) { + AtlasRelatedObjectId entityId = null; + Set<String> guids = new HashSet<>(); + + collectGuids(attrVal, guids); + + // removed attrVal might have elements removed (e.g. removed column); to handle this case register the entity for partial update + addToPostUpdate(entity, attrName, attrVal); + + for (String guid : guids) { + AtlasEntity refEntity = getEntity(guid); + + if (refEntity != null) { + Object refAttr = null; + + if (refEntity.hasRelationshipAttribute(refAttrName)) { + refAttr = refEntity.getRelationshipAttribute(refAttrName); + } else if (refEntity.hasAttribute(refAttrName)) { + refAttr = refEntity.getAttribute(refAttrName); + } else { + if (entityId == null) { + entityId = AtlasTypeUtil.toAtlasRelatedObjectId(entity, typeRegistry); + } + + refAttr = entityId; + } + + if (refAttr != null) { + refAttr = setRelationshipType(refAttr, relationshipType); + } - collectGuids(entity.removeAttribute(attrName), guidsToMove); + if (refAttr != null) { + refEntity.setRelationshipAttribute(refAttrName, refAttr); + } - addToReferredEntitiesToMove(guidsToMove); + addToReferredEntitiesToMove(guid); + } + } + } } public void moveRegisteredReferredEntities() { @@ -236,6 +295,32 @@ public class PreprocessorContext { } } + public void prepareForPostUpdate() { + if (postUpdateEntities != null) { + ListIterator<AtlasEntity> iter = postUpdateEntities.listIterator(); + + while (iter.hasNext()) { + AtlasEntity entity = iter.next(); + String assignedGuid = getAssignedGuid(entity.getGuid()); + + // no need to perform partial-update for entities that are created/deleted while processing this message + if (createdEntities.contains(assignedGuid) || deletedEntities.contains(assignedGuid)) { + iter.remove(); + } else { + entity.setGuid(assignedGuid); + + if (entity.getAttributes() != null) { + setAssignedGuids(entity.getAttributes().values()); + } + + if (entity.getRelationshipAttributes() != null) { + setAssignedGuids(entity.getRelationshipAttributes().values()); + } + } + } + } + } + public String getTypeName(Object obj) { Object ret = null; @@ -258,7 +343,7 @@ public class PreprocessorContext { if (obj instanceof AtlasObjectId) { ret = ((AtlasObjectId) obj).getGuid(); } else if (obj instanceof Map) { - ret = ((Map) obj).get(AtlasObjectId.KEY_GUID); + ret = ((Map) obj).get(KEY_GUID); } else if (obj instanceof AtlasEntity) { ret = ((AtlasEntity) obj).getGuid(); } else if (obj instanceof AtlasEntity.AtlasEntityWithExtInfo) { @@ -274,7 +359,7 @@ public class PreprocessorContext { Collection objList = (Collection) obj; for (Object objElem : objList) { - collectGuid(objElem, guids); + collectGuids(objElem, guids); } } else { collectGuid(obj, guids); @@ -304,4 +389,91 @@ public class PreprocessorContext { return ret; } + + private AtlasRelatedObjectId setRelationshipType(Object attr, String relationshipType) { + AtlasRelatedObjectId ret = null; + + if (attr instanceof AtlasRelatedObjectId) { + ret = (AtlasRelatedObjectId) attr; + } else if (attr instanceof AtlasObjectId) { + ret = new AtlasRelatedObjectId((AtlasObjectId) attr); + } else if (attr instanceof Map) { + ret = new AtlasRelatedObjectId((Map) attr); + } + + if (ret != null) { + ret.setRelationshipType(relationshipType); + } + + return ret; + } + + private String getAssignedGuid(String guid) { + String ret = guidAssignments.get(guid); + + return ret != null ? ret : guid; + } + + private void setAssignedGuids(Object obj) { + if (obj != null) { + if (obj instanceof Collection) { + Collection objList = (Collection) obj; + + for (Object objElem : objList) { + setAssignedGuids(objElem); + } + } else { + setAssignedGuid(obj); + } + } + } + + private void setAssignedGuid(Object obj) { + if (obj instanceof AtlasRelatedObjectId) { + AtlasRelatedObjectId objId = (AtlasRelatedObjectId) obj; + + objId.setGuid(getAssignedGuid(objId.getGuid())); + } else if (obj instanceof AtlasObjectId) { + AtlasObjectId objId = (AtlasObjectId) obj; + + objId.setGuid(getAssignedGuid(objId.getGuid())); + } else if (obj instanceof Map) { + Map objId = (Map) obj; + Object guid = objId.get(KEY_GUID); + + if (guid != null) { + objId.put(KEY_GUID, getAssignedGuid(guid.toString())); + } + } + } + + private void addToPostUpdate(AtlasEntity entity, String attrName, Object attrVal) { + if (LOG.isDebugEnabled()) { + LOG.debug("addToPostUpdate(guid={}, entityType={}, attrName={}", entity.getGuid(), entity.getTypeName(), attrName); + } + + AtlasEntity partialEntity = null; + + if (postUpdateEntities == null) { + postUpdateEntities = new ArrayList<>(); + } + + for (AtlasEntity existing : postUpdateEntities) { + if (StringUtils.equals(entity.getGuid(), existing.getGuid())) { + partialEntity = existing; + + break; + } + } + + if (partialEntity == null) { + partialEntity = new AtlasEntity(entity.getTypeName(), attrName, attrVal); + + partialEntity.setGuid(entity.getGuid()); + + postUpdateEntities.add(partialEntity); + } else { + partialEntity.setAttribute(attrName, attrVal); + } + } } diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/RdbmsPreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/RdbmsPreprocessor.java index adc1983..7dcfa2f 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/RdbmsPreprocessor.java +++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/RdbmsPreprocessor.java @@ -32,6 +32,12 @@ import java.util.Set; public class RdbmsPreprocessor { private static final Logger LOG = LoggerFactory.getLogger(RdbmsPreprocessor.class); + private static final String RELATIONSHIP_TYPE_RDBMS_INSTANCE_DATABASES = "rdbms_instance_databases"; + private static final String RELATIONSHIP_TYPE_RDBMS_DB_TABLES = "rdbms_db_tables"; + private static final String RELATIONSHIP_TYPE_RDBMS_TABLE_COLUMNS = "rdbms_table_columns"; + private static final String RELATIONSHIP_TYPE_RDBMS_TABLE_INDEXES = "rdbms_table_indexes"; + private static final String RELATIONSHIP_TYPE_RDBMS_TABLE_FOREIGN_KEYS = "rdbms_table_foreign_key"; + static class RdbmsInstancePreprocessor extends RdbmsTypePreprocessor { public RdbmsInstancePreprocessor() { super(TYPE_RDBMS_INSTANCE); @@ -121,17 +127,17 @@ public class RdbmsPreprocessor { private void clearRefAttributesAndMove(AtlasEntity entity, PreprocessorContext context) { switch (entity.getTypeName()) { case TYPE_RDBMS_INSTANCE: - context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_DATABASES); + context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_DATABASES, RELATIONSHIP_TYPE_RDBMS_INSTANCE_DATABASES, ATTRIBUTE_INSTANCE); break; case TYPE_RDBMS_DB: - context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_TABLES); + context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_TABLES, RELATIONSHIP_TYPE_RDBMS_DB_TABLES, ATTRIBUTE_DB); break; case TYPE_RDBMS_TABLE: - context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_COLUMNS); - context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_INDEXES); - context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_FOREIGN_KEYS); + context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_COLUMNS, RELATIONSHIP_TYPE_RDBMS_TABLE_COLUMNS, ATTRIBUTE_TABLE); + context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_INDEXES, RELATIONSHIP_TYPE_RDBMS_TABLE_INDEXES, ATTRIBUTE_TABLE); + context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_FOREIGN_KEYS, RELATIONSHIP_TYPE_RDBMS_TABLE_FOREIGN_KEYS, ATTRIBUTE_TABLE); break; } }