http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce20d6f5/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/ArrayVertexMapper.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/ArrayVertexMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/ArrayVertexMapper.java deleted file mode 100644 index ddd2242..0000000 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/ArrayVertexMapper.java +++ /dev/null @@ -1,178 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.repository.store.graph.v1; - -import com.google.common.base.Optional; -import com.google.inject.Inject; -import com.google.inject.Singleton; -import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; -import org.apache.atlas.repository.graph.GraphHelper; -import org.apache.atlas.repository.graphdb.AtlasEdge; -import org.apache.atlas.repository.graphdb.AtlasVertex; -import org.apache.atlas.type.AtlasArrayType; -import org.apache.atlas.type.AtlasStructType; -import org.apache.atlas.type.AtlasType; -import org.apache.commons.collections.CollectionUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; - -import static org.apache.atlas.repository.graph.GraphHelper.string; - -@Singleton -public class ArrayVertexMapper implements InstanceGraphMapper<List> { - - private static final Logger LOG = LoggerFactory.getLogger(ArrayVertexMapper.class); - - protected final DeleteHandlerV1 deleteHandler; - - protected StructVertexMapper structVertexMapper; - - @Inject - public ArrayVertexMapper(DeleteHandlerV1 deleteHandler) { - this.deleteHandler = deleteHandler; - } - - void init(StructVertexMapper structVertexMapper) { - this.structVertexMapper = structVertexMapper; - } - - @Override - public List toGraph(GraphMutationContext ctx) throws AtlasBaseException { - - if (LOG.isDebugEnabled()) { - LOG.debug("Mapping instance to vertex {} for array attribute {}", string(ctx.getReferringVertex()), ctx.getAttrType().getTypeName()); - } - - List newElements = (List) ctx.getValue(); - boolean newAttributeEmpty = (newElements == null || newElements.isEmpty()); - - AtlasArrayType arrType = (AtlasArrayType) ctx.getAttrType(); - AtlasType elementType = arrType.getElementType(); - List<Object> currentElements = getArrayElementsProperty(elementType, ctx.getReferringVertex(), ctx.getVertexPropertyKey()); - - List<Object> newElementsCreated = new ArrayList<>(); - - if (!newAttributeEmpty) { - for (int index = 0; index < newElements.size(); index++) { - - LOG.debug("Adding/updating element at position {}, current element {}, new element {}", index, - (currentElements != null && index < currentElements.size()) ? currentElements.get(index) : null, newElements.get(index)); - - Optional<AtlasEdge> existingEdge = getEdgeAt(currentElements, index, arrType.getElementType()); - - GraphMutationContext arrCtx = new GraphMutationContext.Builder(ctx.getOp(), ctx.getAttribute(), - arrType.getElementType(), newElements.get(index)) - .referringVertex(ctx.getReferringVertex()) - .edge(existingEdge) - .vertexProperty(ctx.getVertexPropertyKey()).build(); - - Object newEntry = structVertexMapper.mapCollectionElementsToVertex(arrCtx); - newElementsCreated.add(newEntry); - } - } - - if (AtlasGraphUtilsV1.isReference(elementType)) { - List<AtlasEdge> additionalEdges = removeUnusedArrayEntries(ctx.getParentType(), ctx.getAttributeDef(), (List) currentElements, (List) newElementsCreated, elementType); - newElementsCreated.addAll(additionalEdges); - } - - // for dereference on way out - setArrayElementsProperty(elementType, ctx.getReferringVertex(), ctx.getVertexPropertyKey(), newElementsCreated); - return newElementsCreated; - } - - @Override - public void cleanUp() throws AtlasBaseException { - - } - - //Removes unused edges from the old collection, compared to the new collection - private List<AtlasEdge> removeUnusedArrayEntries( - AtlasStructType entityType, - AtlasAttributeDef attributeDef, - List<AtlasEdge> currentEntries, - List<AtlasEdge> newEntries, - AtlasType entryType) throws AtlasBaseException { - if (currentEntries != null && !currentEntries.isEmpty()) { - LOG.debug("Removing unused entries from the old collection"); - if (AtlasGraphUtilsV1.isReference(entryType)) { - - Collection<AtlasEdge> edgesToRemove = CollectionUtils.subtract(currentEntries, newEntries); - - LOG.debug("Removing unused entries from the old collection - {}", edgesToRemove); - - if (!edgesToRemove.isEmpty()) { - //Remove the edges for (current edges - new edges) - List<AtlasEdge> additionalElements = new ArrayList<>(); - - for (AtlasEdge edge : edgesToRemove) { - boolean deleteChildReferences = StructVertexMapper.shouldManageChildReferences(entityType, attributeDef.getName()); - boolean deleted = deleteHandler.deleteEdgeReference(edge, entryType.getTypeCategory(), - deleteChildReferences, true); - if (!deleted) { - additionalElements.add(edge); - } - } - - return additionalElements; - } - } - } - return Collections.emptyList(); - } - - public static List<Object> getArrayElementsProperty(AtlasType elementType, AtlasVertex instanceVertex, String propertyName) { - String actualPropertyName = GraphHelper.encodePropertyKey(propertyName); - if (AtlasGraphUtilsV1.isReference(elementType)) { - return (List)instanceVertex.getListProperty(actualPropertyName, AtlasEdge.class); - } - else { - return (List)instanceVertex.getListProperty(actualPropertyName); - } - } - - private Optional<AtlasEdge> getEdgeAt(List<Object> currentElements, int index, AtlasType elemType) { - Optional<AtlasEdge> existingEdge = Optional.absent(); - if ( AtlasGraphUtilsV1.isReference(elemType) ) { - Object currentElement = (currentElements != null && index < currentElements.size()) ? - currentElements.get(index) : null; - - if ( currentElement != null) { - existingEdge = Optional.of((AtlasEdge) currentElement); - } - } - - return existingEdge; - } - - private void setArrayElementsProperty(AtlasType elementType, AtlasVertex instanceVertex, String propertyName, List<Object> values) { - String actualPropertyName = GraphHelper.encodePropertyKey(propertyName); - if (AtlasGraphUtilsV1.isReference(elementType)) { - GraphHelper.setListPropertyFromElementIds(instanceVertex, actualPropertyName, (List) values); - } - else { - GraphHelper.setProperty(instanceVertex, actualPropertyName, values); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce20d6f5/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java index 2b0804f..7141911 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java @@ -17,10 +17,10 @@ */ package org.apache.atlas.repository.store.graph.v1; +import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -31,7 +31,6 @@ import org.apache.atlas.model.TypeCategory; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.AtlasStruct; -import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; import org.apache.atlas.repository.store.graph.EntityGraphDiscovery; import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext; import org.apache.atlas.repository.store.graph.EntityResolver; @@ -41,38 +40,21 @@ import org.apache.atlas.type.AtlasMapType; import org.apache.atlas.type.AtlasStructType; import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasTypeRegistry; -import org.apache.commons.lang3.StringUtils; - -import com.google.common.annotations.VisibleForTesting; -import com.google.inject.Inject; -import com.google.inject.Provider; +import org.apache.atlas.type.AtlasTypeUtil; +import org.apache.atlas.type.AtlasStructType.AtlasAttribute; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class AtlasEntityGraphDiscoveryV1 implements EntityGraphDiscovery { + private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityGraphDiscoveryV1.class); private final AtlasTypeRegistry typeRegistry; - private final EntityGraphDiscoveryContext discoveredEntities; - private final Set<String> processedIds = new HashSet<>(); - private final Collection<EntityResolver> entityResolvers = new LinkedHashSet<>(); - - @Inject - public AtlasEntityGraphDiscoveryV1(AtlasTypeRegistry typeRegistry, Collection<Provider<EntityResolver>> entityResolverProviders) { - this.typeRegistry = typeRegistry; - this.discoveredEntities = new EntityGraphDiscoveryContext(typeRegistry); + private final EntityGraphDiscoveryContext discoveryContext; - for (Provider<EntityResolver> entityResolverProvider : entityResolverProviders) { - entityResolvers.add(entityResolverProvider.get()); - } - } - - @VisibleForTesting - public AtlasEntityGraphDiscoveryV1(AtlasTypeRegistry typeRegistry, List<EntityResolver> entityResolvers) { - this.typeRegistry = typeRegistry; - this.discoveredEntities = new EntityGraphDiscoveryContext(typeRegistry); - - for (EntityResolver entityResolver : entityResolvers) { - this.entityResolvers.add(entityResolver); - } + public AtlasEntityGraphDiscoveryV1(AtlasTypeRegistry typeRegistry, EntityStream entityStream) { + this.typeRegistry = typeRegistry; + this.discoveryContext = new EntityGraphDiscoveryContext(typeRegistry, entityStream); } @Override @@ -81,161 +63,239 @@ public class AtlasEntityGraphDiscoveryV1 implements EntityGraphDiscovery { } @Override - public EntityGraphDiscoveryContext discoverEntities(final List<AtlasEntity> entities) throws AtlasBaseException { + public EntityGraphDiscoveryContext discoverEntities() throws AtlasBaseException { - //walk the graph and discover entity references - discover(entities); + // walk through entities in stream and validate them; record entity references + discoverAndValidate(); - //resolve root and referred entities + // resolve entity references discovered in previous step resolveReferences(); - return discoveredEntities; + return discoveryContext; } @Override public void cleanUp() throws AtlasBaseException { - processedIds.clear(); - discoveredEntities.cleanUp(); - - for (EntityResolver resolver : entityResolvers) { - resolver.cleanUp(); - } + discoveryContext.cleanUp(); } - protected void discover(List<AtlasEntity> entities) throws AtlasBaseException { - for (AtlasEntity entity : entities) { - AtlasEntityType type = typeRegistry.getEntityTypeByName(entity.getTypeName()); + protected void discoverAndValidate() throws AtlasBaseException { + EntityStream entityStream = discoveryContext.getEntityStream(); + + Set<String> walkedEntities = new HashSet<>(); + + // walk through top-level entities and find entity references + while (entityStream.hasNext()) { + AtlasEntity entity = entityStream.next(); + + if (entity != null) { + walkEntityGraph(entity); + + walkedEntities.add(entity.getGuid()); + } + } + + // walk through entities referenced by other entities + // referencedGuids will be updated within this for() loop; avoid use of iterators + List<String> referencedGuids = discoveryContext.getReferencedGuids(); + for (int i = 0; i < referencedGuids.size(); i++) { + String guid = referencedGuids.get(i); - if (type == null) { - throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, TypeCategory.ENTITY.name(), entity.getTypeName()); + if (walkedEntities.contains(guid)) { + continue; } - discoveredEntities.addRootEntity(entity); + AtlasEntity entity = entityStream.getByGuid(guid); - walkEntityGraph(type, entity); + if (entity != null) { + walkEntityGraph(entity); + + walkedEntities.add(entity.getGuid()); + } } } protected void resolveReferences() throws AtlasBaseException { - for (EntityResolver resolver : entityResolvers) { - resolver.init(discoveredEntities); + EntityResolver[] entityResolvers = new EntityResolver[] { new IDBasedEntityResolver(), + new UniqAttrBasedEntityResolver(typeRegistry) + }; - resolver.resolveEntityReferences(); + for (EntityResolver resolver : entityResolvers) { + resolver.resolveEntityReferences(discoveryContext); } + } - if (discoveredEntities.hasUnresolvedReferences()) { - throw new AtlasBaseException(AtlasErrorCode.UNRESOLVED_REFERENCES_FOUND, - discoveredEntities.getUnresolvedIds().toString(), - discoveredEntities.getUnresolvedIdsByUniqAttribs().toString()); + private void visitReference(AtlasEntityType type, Object val) throws AtlasBaseException { + if (type == null || val == null) { + return; } - } - private void visitReference(AtlasEntityType type, Object entity) throws AtlasBaseException { - if (entity != null) { - if (entity instanceof AtlasObjectId) { - AtlasObjectId objId = (AtlasObjectId)entity; + if (val instanceof AtlasObjectId) { + AtlasObjectId objId = (AtlasObjectId)val; + + if (!objId.isValid()) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, objId.toString()); + } - if (!objId.isValid()) { - throw new AtlasBaseException(AtlasErrorCode.INSTANCE_CRUD_INVALID_PARAMS, "Invalid object id " + objId); - } + recordObjectReference(objId); + } else if (val instanceof Map) { + AtlasObjectId objId = new AtlasObjectId((Map)val); - if (!StringUtils.isEmpty(objId.getGuid()) && (objId.isAssignedGuid() || objId.isUnAssignedGuid())) { - discoveredEntities.addUnResolvedId(objId); - } else { - discoveredEntities.addUnresolvedIdByUniqAttribs(objId); - } - } else if (entity instanceof AtlasEntity) { - throw new AtlasBaseException(AtlasErrorCode.INSTANCE_CRUD_INVALID_PARAMS, "Use AtlasObjectId to refer to another instance instead of AtlasEntity " + type.getTypeName()); - } else { - throw new AtlasBaseException(AtlasErrorCode.INSTANCE_CRUD_INVALID_PARAMS, "Invalid object type " + entity.getClass()); + if (!objId.isValid()) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, objId.toString()); } + + recordObjectReference(objId); + } else if (val instanceof AtlasEntity) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, "found AtlasEntity"); + } else { + throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, val.toString()); } } - void visitAttribute(AtlasStructType parentType, AtlasType attrType, AtlasAttributeDef attrDef, Object val) throws AtlasBaseException { - if (val != null) { - if ( isPrimitive(attrType.getTypeCategory()) ) { - return; - } - if (attrType.getTypeCategory() == TypeCategory.ARRAY) { - AtlasArrayType arrayType = (AtlasArrayType) attrType; - AtlasType elemType = arrayType.getElementType(); - - visitCollectionReferences(parentType, attrType, attrDef, elemType, val); - } else if (attrType.getTypeCategory() == TypeCategory.MAP) { - AtlasType keyType = ((AtlasMapType) attrType).getKeyType(); - AtlasType valueType = ((AtlasMapType) attrType).getValueType(); - - visitMapReferences(parentType, attrType, attrDef, keyType, valueType, val); - } else if (attrType.getTypeCategory() == TypeCategory.STRUCT) { - visitStruct((AtlasStructType)attrType, val); - } else if (attrType.getTypeCategory() == TypeCategory.ENTITY) { - visitReference((AtlasEntityType) attrType, val); - } + void visitAttribute(AtlasType attrType, Object val) throws AtlasBaseException { + if (attrType == null || val == null) { + return; + } + + if (isPrimitive(attrType.getTypeCategory()) ) { + return; + } + if (attrType.getTypeCategory() == TypeCategory.ARRAY) { + AtlasArrayType arrayType = (AtlasArrayType) attrType; + AtlasType elemType = arrayType.getElementType(); + + visitCollectionReferences(elemType, val); + } else if (attrType.getTypeCategory() == TypeCategory.MAP) { + AtlasType keyType = ((AtlasMapType) attrType).getKeyType(); + AtlasType valueType = ((AtlasMapType) attrType).getValueType(); + + visitMapReferences(keyType, valueType, val); + } else if (attrType.getTypeCategory() == TypeCategory.STRUCT) { + visitStruct((AtlasStructType)attrType, val); + } else if (attrType.getTypeCategory() == TypeCategory.ENTITY) { + visitReference((AtlasEntityType) attrType, val); } } - void visitMapReferences(AtlasStructType parentType, final AtlasType attrType, AtlasAttributeDef attrDef, AtlasType keyType, AtlasType valueType, Object val) throws AtlasBaseException { + void visitMapReferences(AtlasType keyType, AtlasType valueType, Object val) throws AtlasBaseException { + if (keyType == null || valueType == null || val == null) { + return; + } + if (isPrimitive(keyType.getTypeCategory()) && isPrimitive(valueType.getTypeCategory())) { return; } - if (val != null) { - if (Map.class.isAssignableFrom(val.getClass())) { - Iterator<Map.Entry> it = ((Map) val).entrySet().iterator(); - while (it.hasNext()) { - Map.Entry e = it.next(); - visitAttribute(parentType, keyType, attrDef, e.getKey()); - visitAttribute(parentType, valueType, attrDef, e.getValue()); - } + if (Map.class.isAssignableFrom(val.getClass())) { + Iterator<Map.Entry> it = ((Map) val).entrySet().iterator(); + while (it.hasNext()) { + Map.Entry e = it.next(); + visitAttribute(keyType, e.getKey()); + visitAttribute(valueType, e.getValue()); } } } - void visitCollectionReferences(final AtlasStructType parentType, final AtlasType attrType, final AtlasAttributeDef attrDef, AtlasType elemType, Object val) throws AtlasBaseException { - if (isPrimitive(elemType.getTypeCategory())) { + void visitCollectionReferences(AtlasType elemType, Object val) throws AtlasBaseException { + if (elemType == null || val == null || isPrimitive(elemType.getTypeCategory())) { return; } - if (val != null) { - Iterator it = null; - if (val instanceof Collection) { - it = ((Collection) val).iterator(); - } else if (val instanceof Iterable) { - it = ((Iterable) val).iterator(); - } else if (val instanceof Iterator) { - it = (Iterator) val; - } - if (it != null) { - while (it.hasNext()) { - Object elem = it.next(); - visitAttribute(parentType, elemType, attrDef, elem); - } + Iterator it = null; + + if (val instanceof Collection) { + it = ((Collection) val).iterator(); + } else if (val instanceof Iterable) { + it = ((Iterable) val).iterator(); + } else if (val instanceof Iterator) { + it = (Iterator) val; + } + + if (it != null) { + while (it.hasNext()) { + Object elem = it.next(); + visitAttribute(elemType, elem); } } } void visitStruct(AtlasStructType structType, Object val) throws AtlasBaseException { - if (structType == null) { + if (structType == null || val == null) { return; } - for (AtlasStructType.AtlasAttribute attribute : structType.getAllAttributes().values()) { + AtlasStruct struct; + + if (val instanceof AtlasStruct) { + struct = (AtlasStruct) val; + } else if (val instanceof Map) { + Map attributes = AtlasTypeUtil.toStructAttributes((Map) val); + + struct = new AtlasStruct(structType.getTypeName(), attributes); + } else { + throw new AtlasBaseException(AtlasErrorCode.INVALID_STRUCT_VALUE, val.toString()); + } + + for (AtlasAttribute attribute : structType.getAllAttributes().values()) { AtlasType attrType = attribute.getAttributeType(); - Object attrVal = ((AtlasStruct) val).getAttribute(attribute.getName()); + Object attrVal = struct.getAttribute(attribute.getName()); - visitAttribute(structType, attrType, attribute.getAttributeDef(), attrVal); + visitAttribute(attrType, attrVal); } } - void walkEntityGraph(AtlasEntityType entityType, AtlasEntity entity) throws AtlasBaseException { - visitStruct(entityType, entity); + void walkEntityGraph(AtlasEntity entity) throws AtlasBaseException { + if (entity == null) { + return; + } + + validateAndNormalize(entity); + AtlasEntityType type = typeRegistry.getEntityTypeByName(entity.getTypeName()); + + recordObjectReference(entity.getGuid()); + + visitStruct(type, entity); } boolean isPrimitive(TypeCategory typeCategory) { return typeCategory == TypeCategory.PRIMITIVE || typeCategory == TypeCategory.ENUM; } + + private void validateAndNormalize(AtlasEntity entity) throws AtlasBaseException { + List<String> messages = new ArrayList<>(); + + if (!AtlasEntity.isAssigned(entity.getGuid()) && !AtlasEntity.isUnAssigned(entity.getGuid())) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, "invalid guid " + entity.getGuid()); + } + + AtlasEntityType type = typeRegistry.getEntityTypeByName(entity.getTypeName()); + + if (type == null) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, TypeCategory.ENTITY.name(), entity.getTypeName()); + } + + type.validateValue(entity, entity.getTypeName(), messages); + + if (!messages.isEmpty()) { + throw new AtlasBaseException(AtlasErrorCode.INSTANCE_CRUD_INVALID_PARAMS, messages); + } + + type.getNormalizedValue(entity); + } + + private void recordObjectReference(String guid) { + discoveryContext.addReferencedGuid(guid); + } + + private void recordObjectReference(AtlasObjectId objId) { + if (objId.isValidGuid()) { + discoveryContext.addReferencedGuid(objId.getGuid()); + } else { + discoveryContext.addReferencedByUniqAttribs(objId); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce20d6f5/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java index 566207b..1f4ad57 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java @@ -19,234 +19,217 @@ package org.apache.atlas.repository.store.graph.v1; import com.google.inject.Inject; +import com.google.inject.Singleton; import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.GraphTransaction; import org.apache.atlas.RequestContextV1; import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.TypeCategory; import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; -import org.apache.atlas.model.instance.AtlasEntity.Status; -import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.EntityMutationResponse; -import org.apache.atlas.repository.graph.AtlasGraphProvider; -import org.apache.atlas.repository.graph.GraphHelper; -import org.apache.atlas.repository.Constants; -import org.apache.atlas.repository.graphdb.AtlasGraph; -import org.apache.atlas.repository.graphdb.AtlasGraphQuery; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.repository.store.graph.EntityGraphDiscovery; import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext; -import org.apache.atlas.repository.store.graph.EntityResolver; import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasTypeRegistry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import java.util.Map; +@Singleton public class AtlasEntityStoreV1 implements AtlasEntityStore { + private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV1.class); - protected AtlasTypeRegistry typeRegistry; - - private final EntityGraphMapper graphMapper; - private final AtlasGraph graph; - private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV1.class); + private final DeleteHandlerV1 deleteHandler; + private final AtlasTypeRegistry typeRegistry; @Inject - public AtlasEntityStoreV1(EntityGraphMapper vertexMapper) { - this.graphMapper = vertexMapper; - this.graph = AtlasGraphProvider.getGraphInstance(); + public AtlasEntityStoreV1(DeleteHandlerV1 deleteHandler, AtlasTypeRegistry typeRegistry) { + this.deleteHandler = deleteHandler; + this.typeRegistry = typeRegistry; } - @Inject - public void init(AtlasTypeRegistry typeRegistry) throws AtlasBaseException { - this.typeRegistry = typeRegistry; + @Override + @GraphTransaction + public AtlasEntityWithExtInfo getById(String guid) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> getById({})", guid); + } + + EntityGraphRetriever entityRetriever = new EntityGraphRetriever(typeRegistry); + + AtlasEntityWithExtInfo ret = entityRetriever.toAtlasEntityWithExtInfo(guid); + + if (ret == null) { + throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== getById({}): {}", guid, ret); + } + + return ret; } @Override - public AtlasEntityWithExtInfo getById(final String guid) throws AtlasBaseException { + @GraphTransaction + public AtlasEntitiesWithExtInfo getByIds(List<String> guids) throws AtlasBaseException { if (LOG.isDebugEnabled()) { - LOG.debug("Retrieving entity with guid={}", guid); + LOG.debug("==> getByIds({})", guids); } EntityGraphRetriever entityRetriever = new EntityGraphRetriever(typeRegistry); - return entityRetriever.toAtlasEntityWithExtInfo(guid); + AtlasEntitiesWithExtInfo ret = entityRetriever.toAtlasEntitiesWithExtInfo(guids); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== getByIds({}): {}", guids, ret); + } + + return ret; } @Override - public AtlasEntityWithExtInfo getByUniqueAttribute(AtlasEntityType entityType, Map<String, Object> uniqAttributes) throws AtlasBaseException { - String entityTypeName = entityType.getTypeName(); - + @GraphTransaction + public AtlasEntityWithExtInfo getByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes) + throws AtlasBaseException { if (LOG.isDebugEnabled()) { - LOG.debug("Retrieving entity with type={} and attributes={}: values={}", entityTypeName, uniqAttributes); + LOG.debug("==> getByUniqueAttribute({}, {})", entityType.getTypeName(), uniqAttributes); } AtlasVertex entityVertex = AtlasGraphUtilsV1.getVertexByUniqueAttributes(entityType, uniqAttributes); EntityGraphRetriever entityRetriever = new EntityGraphRetriever(typeRegistry); - return entityRetriever.toAtlasEntityWithExtInfo(entityVertex); - } + AtlasEntityWithExtInfo ret = entityRetriever.toAtlasEntityWithExtInfo(entityVertex); - @Override - public EntityMutationResponse deleteById(final String guid) { - return null; + if (ret == null) { + throw new AtlasBaseException(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND, entityType.getTypeName(), + uniqAttributes.toString()); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== getByUniqueAttribute({}, {}): {}", entityType.getTypeName(), uniqAttributes, ret); + } + + return ret; } @Override @GraphTransaction - public EntityMutationResponse createOrUpdate(final Map<String, AtlasEntity> entities) throws AtlasBaseException { - + public EntityMutationResponse createOrUpdate(EntityStream entityStream) throws AtlasBaseException { if (LOG.isDebugEnabled()) { - LOG.debug("==> AtlasEntityStoreV1.createOrUpdate({}, {})", entities); + LOG.debug("==> createOrUpdate()"); } - //Validate - List<AtlasEntity> normalizedEntities = validateAndNormalize(entities); + if (entityStream == null || !entityStream.hasNext()) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update."); + } - //Discover entities, create vertices - EntityMutationContext ctx = preCreateOrUpdate(normalizedEntities); + EntityGraphMapper entityGraphMapper = new EntityGraphMapper(deleteHandler, typeRegistry); + + // Create/Update entities + EntityMutationContext context = preCreateOrUpdate(entityStream, entityGraphMapper); + + EntityMutationResponse ret = entityGraphMapper.mapAttributes(context); + + ret.setGuidAssignments(context.getGuidAssignments()); if (LOG.isDebugEnabled()) { - LOG.debug("<== AtlasStructDefStoreV1.createOrUpdate({}, {}): {}", entities); + LOG.debug("<== createOrUpdate()"); } - return graphMapper.mapAttributes(ctx); + return ret; } @Override - public AtlasEntitiesWithExtInfo getByIds(final List<String> guids) throws AtlasBaseException { - return null; + @GraphTransaction + public EntityMutationResponse updateByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes, + AtlasEntity entity) throws AtlasBaseException { + throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, "updateByUniqueAttributes() not implemented yet"); } @Override - public EntityMutationResponse deleteByIds(final List<String> guid) throws AtlasBaseException { - return null; + @GraphTransaction + public EntityMutationResponse deleteById(String guid) throws AtlasBaseException { + throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, "deleteById() not implemented yet"); } @Override - public EntityMutationResponse updateByUniqueAttribute(final String typeName, final String attributeName, final String attributeValue, final AtlasEntity entity) throws AtlasBaseException { - return null; + @GraphTransaction + public EntityMutationResponse deleteByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes) + throws AtlasBaseException { + throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, "deleteByUniqueAttributes() not implemented yet"); } @Override - public EntityMutationResponse deleteByUniqueAttribute(final String typeName, final String attributeName, final String attributeValue) throws AtlasBaseException { - return null; + @GraphTransaction + public EntityMutationResponse deleteByIds(List<String> guids) throws AtlasBaseException { + throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, "deleteByIds() not implemented yet"); } @Override - public void addClassifications(final String guid, final List<AtlasClassification> classification) throws AtlasBaseException { - + @GraphTransaction + public void addClassifications(String guid, List<AtlasClassification> classification) throws AtlasBaseException { + throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, "addClassifications() not implemented yet"); } @Override - public void updateClassifications(final String guid, final List<AtlasClassification> classification) throws AtlasBaseException { - + @GraphTransaction + public void updateClassifications(String guid, List<AtlasClassification> classification) throws AtlasBaseException { + throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, "updateClassifications() not implemented yet"); } @Override - public void deleteClassifications(final String guid, final List<String> classificationNames) throws AtlasBaseException { - + @GraphTransaction + public void deleteClassifications(String guid, List<String> classificationNames) throws AtlasBaseException { + throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, "deleteClassifications() not implemented yet"); } - private EntityMutationContext preCreateOrUpdate(final List<AtlasEntity> atlasEntities) throws AtlasBaseException { - List<EntityResolver> entityResolvers = new ArrayList<>(); - entityResolvers.add(new IDBasedEntityResolver()); - entityResolvers.add(new UniqAttrBasedEntityResolver(typeRegistry)); + private EntityMutationContext preCreateOrUpdate(EntityStream entityStream, EntityGraphMapper entityGraphMapper) throws AtlasBaseException { + EntityGraphDiscovery graphDiscoverer = new AtlasEntityGraphDiscoveryV1(typeRegistry, entityStream); + EntityGraphDiscoveryContext discoveryContext = graphDiscoverer.discoverEntities(); + EntityMutationContext context = new EntityMutationContext(discoveryContext); - EntityGraphDiscovery graphDiscoverer = new AtlasEntityGraphDiscoveryV1(typeRegistry, entityResolvers); - EntityGraphDiscoveryContext discoveredEntities = graphDiscoverer.discoverEntities(atlasEntities); - EntityMutationContext context = new EntityMutationContext(discoveredEntities); + for (String guid : discoveryContext.getReferencedGuids()) { + AtlasVertex vertex = discoveryContext.getResolvedEntityVertex(guid); + AtlasEntity entity = entityStream.getByGuid(guid); - for (AtlasEntity entity : discoveredEntities.getRootEntities()) { - if (LOG.isDebugEnabled()) { - LOG.debug("==> AtlasEntityStoreV1.preCreateOrUpdate({}): {}", entity); - } - - AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName()); - - if (entityType == null) { - throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, TypeCategory.ENTITY.name(), entity.getTypeName()); - } - - final AtlasVertex vertex; - AtlasObjectId objId = entity.getAtlasObjectId(); + if (vertex != null) { + // entity would be null if guid is not in the stream but referenced by an entity in the stream + if (entity != null) { + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName()); - if (discoveredEntities.isResolvedId(objId) ) { - vertex = discoveredEntities.getResolvedEntityVertex(objId); + context.addUpdated(entity, entityType, vertex); - context.addUpdated(entity, entityType, vertex); - - String guid = AtlasGraphUtilsV1.getIdFromVertex(vertex); - - RequestContextV1.get().recordEntityUpdate(new AtlasObjectId(entityType.getTypeName(), guid)); + RequestContextV1.get().recordEntityUpdate(entity.getAtlasObjectId()); + } } else { + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName()); + //Create vertices which do not exist in the repository - vertex = graphMapper.createVertexTemplate(entity, entityType); + vertex = entityGraphMapper.createVertex(entity); - context.addCreated(entity, entityType, vertex); + discoveryContext.addResolvedGuid(guid, vertex); - discoveredEntities.addResolvedId(objId, vertex); - discoveredEntities.removeUnResolvedId(objId); + String generatedGuid = AtlasGraphUtilsV1.getIdFromVertex(vertex); - String guid = AtlasGraphUtilsV1.getIdFromVertex(vertex); + entity.setGuid(generatedGuid); - RequestContextV1.get().recordEntityCreate(new AtlasObjectId(entityType.getTypeName(), guid)); - } + context.addCreated(guid, entity, entityType, vertex); - if (LOG.isDebugEnabled()) { - LOG.debug("<== AtlasEntityStoreV1.preCreateOrUpdate({}): {}", entity, vertex); + RequestContextV1.get().recordEntityCreate(entity.getAtlasObjectId()); } } return context; } - - private List<AtlasEntity> validateAndNormalize(final Map<String, AtlasEntity> entities) throws AtlasBaseException { - List<AtlasEntity> normalizedEntities = new ArrayList<>(); - List<String> messages = new ArrayList<>(); - - for (String entityId : entities.keySet()) { - if ( !AtlasEntity.isAssigned(entityId) && !AtlasEntity.isUnAssigned(entityId)) { - throw new AtlasBaseException(AtlasErrorCode.INSTANCE_CRUD_INVALID_PARAMS, ": Guid in map key is invalid " + entityId); - } - - AtlasEntity entity = entities.get(entityId); - - if ( entity == null) { - throw new AtlasBaseException(AtlasErrorCode.INSTANCE_CRUD_INVALID_PARAMS, ": Entity is null for guid " + entityId); - } - - AtlasEntityType type = typeRegistry.getEntityTypeByName(entity.getTypeName()); - if (type == null) { - throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, TypeCategory.ENTITY.name(), entity.getTypeName()); - } - - type.validateValue(entity, entity.getTypeName(), messages); - - if ( !messages.isEmpty()) { - throw new AtlasBaseException(AtlasErrorCode.INSTANCE_CRUD_INVALID_PARAMS, messages); - } - - AtlasEntity normalizedEntity = (AtlasEntity) type.getNormalizedValue(entity); - - normalizedEntities.add(normalizedEntity); - } - - return normalizedEntities; - } - - public void cleanUp() throws AtlasBaseException { - } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce20d6f5/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStream.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStream.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStream.java new file mode 100644 index 0000000..010b626 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStream.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v1; + +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; + +import java.util.Iterator; + +public class AtlasEntityStream implements EntityStream { + private AtlasEntitiesWithExtInfo entitiesWithExtInfo = new AtlasEntitiesWithExtInfo(); + private Iterator<AtlasEntity> iterator; + + public AtlasEntityStream() { + } + + public AtlasEntityStream(AtlasEntity entity) { + this(new AtlasEntitiesWithExtInfo(entity)); + } + + public AtlasEntityStream(AtlasEntityWithExtInfo entityWithExtInfo) { + this(new AtlasEntitiesWithExtInfo(entityWithExtInfo)); + } + + public AtlasEntityStream(AtlasEntitiesWithExtInfo entitiesWithExtInfo) { + this.entitiesWithExtInfo = entitiesWithExtInfo; + this.iterator = this.entitiesWithExtInfo.getEntities().iterator(); + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public AtlasEntity next() { + return iterator.hasNext() ? iterator.next() : null; + } + + @Override + public void reset() { + this.iterator = entitiesWithExtInfo.getEntities().iterator(); + } + + @Override + public AtlasEntity getByGuid(String guid) { + return entitiesWithExtInfo.getEntity(guid); + } + + @Override + public String toString() { + StringBuffer sb = new StringBuffer("AtlasEntityStream{"); + + sb.append("entitiesWithExtInfo=").append(entitiesWithExtInfo); + sb.append(", iterator=").append(iterator); + sb.append('}'); + + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce20d6f5/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java index b17cf90..99f074b 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java @@ -217,7 +217,7 @@ public class AtlasGraphUtilsV1 { if (entityVertex == null) { throw new AtlasBaseException(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND, entityType.getTypeName(), - uniqAttributes.keySet().toString(), uniqAttributes.values().toString()); + uniqAttributes.toString()); } return entityVertex; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce20d6f5/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AttributeMutationContext.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AttributeMutationContext.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AttributeMutationContext.java new file mode 100644 index 0000000..b6d82dd --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AttributeMutationContext.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v1; + + +import org.apache.atlas.model.instance.EntityMutations.EntityOperation; +import org.apache.atlas.model.typedef.AtlasStructDef; +import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; +import org.apache.atlas.repository.graphdb.AtlasEdge; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.type.AtlasStructType; +import org.apache.atlas.type.AtlasStructType.AtlasAttribute; +import org.apache.atlas.type.AtlasType; + + +import java.util.Objects; + +public class AttributeMutationContext { + private EntityOperation op; + /** + * Atlas Attribute + */ + + private AtlasAttribute attribute; + + /** + * Overriding type for which elements are being mapped + */ + private AtlasType currentElementType; + + /** + * Current attribute value/entity/Struct instance + */ + private Object value; + + private String vertexProperty; + + /** + * + * The vertex which corresponds to the entity/struct for which we are mapping a complex attributes like struct, traits + */ + AtlasVertex referringVertex; + + /** + * The current edge(in case of updates) from the parent entity/struct to the complex attribute like struct, trait + */ + AtlasEdge existingEdge; + + public AttributeMutationContext(EntityOperation op, AtlasVertex referringVertex, AtlasAttribute attribute, Object value) { + this(op, referringVertex, attribute, value, attribute.getVertexPropertyName(), null, null); + } + + public AttributeMutationContext(EntityOperation op, AtlasVertex referringVertex, AtlasAttribute attribute, Object value, + String vertexProperty, AtlasType currentElementType, AtlasEdge currentEdge) { + this.op = op; + this.referringVertex = referringVertex; + this.attribute = attribute; + this.value = value; + this.vertexProperty = vertexProperty; + this.currentElementType = currentElementType; + this.existingEdge = currentEdge; + } + + @Override + public int hashCode() { + return Objects.hash(op, referringVertex, attribute, value, vertexProperty, currentElementType, existingEdge); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } else if (obj == this) { + return true; + } else if (obj.getClass() != getClass()) { + return false; + } else { + AttributeMutationContext rhs = (AttributeMutationContext) obj; + return Objects.equals(op, rhs.op) + && Objects.equals(referringVertex, rhs.referringVertex) + && Objects.equals(attribute, rhs.attribute) + && Objects.equals(value, rhs.value) + && Objects.equals(vertexProperty, rhs.vertexProperty) + && Objects.equals(currentElementType, rhs.currentElementType) + && Objects.equals(existingEdge, rhs.existingEdge); + } + } + + + public AtlasStructType getParentType() { + return attribute.getDefinedInType(); + } + + public AtlasStructDef getStructDef() { + return attribute.getDefinedInDef(); + } + + public AtlasAttributeDef getAttributeDef() { + return attribute.getAttributeDef(); + } + + public AtlasType getAttrType() { + return currentElementType == null ? attribute.getAttributeType() : currentElementType; + } + + public AtlasType getCurrentElementType() { + return currentElementType; + } + + public Object getValue() { + return value; + } + + public String getVertexProperty() { return vertexProperty; } + + public AtlasVertex getReferringVertex() { return referringVertex; } + + public AtlasEdge getCurrentEdge() { + return existingEdge; + } + + public void setElementType(final AtlasType attrType) { + this.currentElementType = attrType; + } + + public AtlasAttribute getAttribute() { + return attribute; + } + + public EntityOperation getOp() { + return op; + } + + public void setExistingEdge(AtlasEdge existingEdge) { this.existingEdge = existingEdge; } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce20d6f5/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java index 14013fb..ff13ea5 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java @@ -98,7 +98,7 @@ public abstract class DeleteHandlerV1 { // Record all deletion candidate GUIDs in RequestContext // and gather deletion candidate vertices. for (GraphHelper.VertexInfo vertexInfo : compositeVertices) { - requestContext.recordEntityDelete(new AtlasObjectId(vertexInfo.getTypeName(), vertexInfo.getGuid())); + requestContext.recordEntityDelete(new AtlasObjectId(vertexInfo.getGuid(), vertexInfo.getTypeName())); deletionCandidateVertices.add(vertexInfo.getVertex()); } } @@ -324,7 +324,7 @@ public abstract class DeleteHandlerV1 { String propertyName = AtlasGraphUtilsV1.getQualifiedAttributePropertyKey(structType, attributeInfo.getName()); if (AtlasGraphUtilsV1.isReference(valueTypeCategory)) { - List<Object> keys = ArrayVertexMapper.getArrayElementsProperty(keyType, instanceVertex, propertyName); + List<Object> keys = EntityGraphMapper.getArrayElementsProperty(keyType, instanceVertex, propertyName); if (keys != null) { for (Object key : keys) { String mapEdgeLabel = GraphHelper.getQualifiedNameForMapKey(edgeLabel, (String) key); @@ -513,7 +513,7 @@ public abstract class DeleteHandlerV1 { GraphHelper.setProperty(outVertex, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, requestContext.getRequestTime()); GraphHelper.setProperty(outVertex, Constants.MODIFIED_BY_KEY, requestContext.getUser()); - requestContext.recordEntityUpdate(new AtlasObjectId(typeName, outId)); + requestContext.recordEntityUpdate(new AtlasObjectId(outId, typeName)); } }
