http://git-wip-us.apache.org/repos/asf/atlas/blob/3d5b4880/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipDefStoreV2.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipDefStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipDefStoreV2.java new file mode 100644 index 0000000..bc03aeb --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipDefStoreV2.java @@ -0,0 +1,508 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v2; + +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.AtlasException; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.typedef.AtlasRelationshipDef; +import org.apache.atlas.model.typedef.AtlasRelationshipDef.RelationshipCategory; +import org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags; +import org.apache.atlas.model.typedef.AtlasRelationshipEndDef; +import org.apache.atlas.query.AtlasDSL; +import org.apache.atlas.repository.Constants; +import org.apache.atlas.repository.graphdb.AtlasEdge; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.type.AtlasRelationshipType; +import org.apache.atlas.type.AtlasType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.typesystem.types.DataTypes.TypeCategory; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.atlas.authorize.AtlasPrivilege; +import org.apache.atlas.authorize.AtlasTypeAccessRequest; +import org.apache.atlas.authorize.AtlasAuthorizationUtils; +import javax.inject.Inject; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** + * RelationshipDef store in v1 format. + */ +public class AtlasRelationshipDefStoreV2 extends AtlasAbstractDefStoreV2<AtlasRelationshipDef> { + private static final Logger LOG = LoggerFactory.getLogger(AtlasRelationshipDefStoreV2.class); + + @Inject + public AtlasRelationshipDefStoreV2(AtlasTypeDefGraphStoreV2 typeDefStore, AtlasTypeRegistry typeRegistry) { + super(typeDefStore, typeRegistry); + } + + @Override + public AtlasVertex preCreate(AtlasRelationshipDef relationshipDef) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasRelationshipDefStoreV1.preCreate({})", relationshipDef); + } + + validateType(relationshipDef); + + AtlasType type = typeRegistry.getType(relationshipDef.getName()); + + if (type.getTypeCategory() != org.apache.atlas.model.TypeCategory.RELATIONSHIP) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_MATCH_FAILED, relationshipDef.getName(), TypeCategory.RELATIONSHIP.name()); + } + + AtlasVertex relationshipDefVertex = typeDefStore.findTypeVertexByName(relationshipDef.getName()); + + if (relationshipDefVertex != null) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_ALREADY_EXISTS, relationshipDef.getName()); + } + + relationshipDefVertex = typeDefStore.createTypeVertex(relationshipDef); + + updateVertexPreCreate(relationshipDef, (AtlasRelationshipType) type, relationshipDefVertex); + + final AtlasRelationshipEndDef endDef1 = relationshipDef.getEndDef1(); + final AtlasRelationshipEndDef endDef2 = relationshipDef.getEndDef2(); + final String type1 = endDef1.getType(); + final String type2 = endDef2.getType(); + final String name1 = endDef1.getName(); + final String name2 = endDef2.getName(); + final AtlasVertex end1TypeVertex = typeDefStore.findTypeVertexByName(type1); + final AtlasVertex end2TypeVertex = typeDefStore.findTypeVertexByName(type2); + + if (end1TypeVertex == null) { + throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_END_TYPE_NAME_NOT_FOUND, relationshipDef.getName(), type1); + } + + if (end2TypeVertex == null) { + throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_END_TYPE_NAME_NOT_FOUND, relationshipDef.getName(), type2); + } + + // create an edge between the relationshipDef and each of the entityDef vertices. + AtlasEdge edge1 = typeDefStore.getOrCreateEdge(relationshipDefVertex, end1TypeVertex, AtlasGraphUtilsV2.RELATIONSHIPTYPE_EDGE_LABEL); + + /* + Where edge1 and edge2 have the same names and types we do not need a second edge. + We are not invoking the equals method on the AtlasRelationshipedDef, as we only want 1 edge even if propagateTags or other properties are different. + */ + + if (type1.equals(type2) && name1.equals(name2)) { + if (LOG.isDebugEnabled()) { + LOG.debug("AtlasRelationshipDefStoreV1.preCreate({}): created relationshipDef vertex {}," + + " and one edge as {}, because end1 and end2 have the same type and name", relationshipDef, relationshipDefVertex, edge1); + } + + } else { + AtlasEdge edge2 = typeDefStore.getOrCreateEdge(relationshipDefVertex, end2TypeVertex, AtlasGraphUtilsV2.RELATIONSHIPTYPE_EDGE_LABEL); + if (LOG.isDebugEnabled()) { + LOG.debug("AtlasRelationshipDefStoreV1.preCreate({}): created relationshipDef vertex {}," + + " edge1 as {}, edge2 as {} ", relationshipDef, relationshipDefVertex, edge1, edge2); + } + + } + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasRelationshipDefStoreV1.preCreate({}): {}", relationshipDef, relationshipDefVertex); + } + return relationshipDefVertex; + } + + @Override + public AtlasRelationshipDef create(AtlasRelationshipDef relationshipDef, AtlasVertex preCreateResult) + throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasRelationshipDefStoreV1.create({}, {})", relationshipDef, preCreateResult); + } + + AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_CREATE, relationshipDef), "create relationship-def ", relationshipDef.getName()); + + AtlasVertex vertex = (preCreateResult == null) ? preCreate(relationshipDef) : preCreateResult; + + AtlasRelationshipDef ret = toRelationshipDef(vertex); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasRelationshipDefStoreV1.create({}, {}): {}", relationshipDef, preCreateResult, ret); + } + + return ret; + } + + @Override + public List<AtlasRelationshipDef> getAll() throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasRelationshipDefStoreV1.getAll()"); + } + + List<AtlasRelationshipDef> ret = new ArrayList<>(); + Iterator<AtlasVertex> vertices = typeDefStore.findTypeVerticesByCategory(TypeCategory.RELATIONSHIP); + + while (vertices.hasNext()) { + ret.add(toRelationshipDef(vertices.next())); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasRelationshipDefStoreV1.getAll(): count={}", ret.size()); + } + + return ret; + } + + @Override + public AtlasRelationshipDef getByName(String name) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasRelationshipDefStoreV1.getByName({})", name); + } + + AtlasVertex vertex = typeDefStore.findTypeVertexByNameAndCategory(name, TypeCategory.RELATIONSHIP); + + if (vertex == null) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name); + } + + vertex.getProperty(Constants.TYPE_CATEGORY_PROPERTY_KEY, TypeCategory.class); + + AtlasRelationshipDef ret = toRelationshipDef(vertex); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasRelationshipDefStoreV1.getByName({}): {}", name, ret); + } + + return ret; + } + + @Override + public AtlasRelationshipDef getByGuid(String guid) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasRelationshipDefStoreV1.getByGuid({})", guid); + } + + AtlasVertex vertex = typeDefStore.findTypeVertexByGuidAndCategory(guid, TypeCategory.RELATIONSHIP); + + if (vertex == null) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid); + } + + AtlasRelationshipDef ret = toRelationshipDef(vertex); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasRelationshipDefStoreV1.getByGuid({}): {}", guid, ret); + } + + return ret; + } + + @Override + public AtlasRelationshipDef update(AtlasRelationshipDef relationshipDef) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasRelationshipDefStoreV1.update({})", relationshipDef); + } + + validateType(relationshipDef); + + AtlasRelationshipDef ret = StringUtils.isNotBlank(relationshipDef.getGuid()) + ? updateByGuid(relationshipDef.getGuid(), relationshipDef) + : updateByName(relationshipDef.getName(), relationshipDef); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasRelationshipDefStoreV1.update({}): {}", relationshipDef, ret); + } + + return ret; + } + + @Override + public AtlasRelationshipDef updateByName(String name, AtlasRelationshipDef relationshipDef) + throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasRelationshipDefStoreV1.updateByName({}, {})", name, relationshipDef); + } + + AtlasRelationshipDef existingDef = typeRegistry.getRelationshipDefByName(name); + + AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_UPDATE, existingDef), "update relationship-def ", name); + + validateType(relationshipDef); + + AtlasType type = typeRegistry.getType(relationshipDef.getName()); + + if (type.getTypeCategory() != org.apache.atlas.model.TypeCategory.RELATIONSHIP) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_MATCH_FAILED, relationshipDef.getName(), TypeCategory.RELATIONSHIP.name()); + } + + AtlasVertex vertex = typeDefStore.findTypeVertexByNameAndCategory(name, TypeCategory.RELATIONSHIP); + + if (vertex == null) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name); + } + + preUpdateCheck(relationshipDef, (AtlasRelationshipType) type, vertex); + + AtlasRelationshipDef ret = toRelationshipDef(vertex); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasRelationshipDefStoreV1.updateByName({}, {}): {}", name, relationshipDef, ret); + } + + return ret; + } + + @Override + public AtlasRelationshipDef updateByGuid(String guid, AtlasRelationshipDef relationshipDef) + throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasRelationshipDefStoreV1.updateByGuid({})", guid); + } + + AtlasRelationshipDef existingDef = typeRegistry.getRelationshipDefByGuid(guid); + + AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_UPDATE, existingDef), "update relationship-Def ", (existingDef != null ? existingDef.getName() : guid)); + + validateType(relationshipDef); + + AtlasType type = typeRegistry.getTypeByGuid(guid); + + if (type.getTypeCategory() != org.apache.atlas.model.TypeCategory.RELATIONSHIP) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_MATCH_FAILED, relationshipDef.getName(), TypeCategory.RELATIONSHIP.name()); + } + + AtlasVertex vertex = typeDefStore.findTypeVertexByGuidAndCategory(guid, TypeCategory.RELATIONSHIP); + + if (vertex == null) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid); + } + + preUpdateCheck(relationshipDef, (AtlasRelationshipType) type, vertex); + // updates should not effect the edges between the types as we do not allow updates that change the endpoints. + + AtlasRelationshipDef ret = toRelationshipDef(vertex); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasRelationshipDefStoreV1.updateByGuid({}): {}", guid, ret); + } + + return ret; + } + + @Override + public AtlasVertex preDeleteByName(String name) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasRelationshipDefStoreV1.preDeleteByName({})", name); + } + + AtlasRelationshipDef existingDef = typeRegistry.getRelationshipDefByName(name); + + AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_DELETE, existingDef), "delete relationship-def ", name); + + AtlasVertex ret = typeDefStore.findTypeVertexByNameAndCategory(name, TypeCategory.RELATIONSHIP); + + if (ret == null) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name); + } + + if (AtlasGraphUtilsV2.relationshipTypeHasInstanceEdges(name)) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_HAS_REFERENCES, name); + } + + typeDefStore.deleteTypeVertexOutEdges(ret); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasRelationshipDefStoreV1.preDeleteByName({}): {}", name, ret); + } + + return ret; + } + + @Override + public AtlasVertex preDeleteByGuid(String guid) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasRelationshipDefStoreV1.preDeleteByGuid({})", guid); + } + + AtlasRelationshipDef existingDef = typeRegistry.getRelationshipDefByGuid(guid); + + AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_DELETE, existingDef), "delete relationship-def ", (existingDef != null ? existingDef.getName() : guid)); + + AtlasVertex ret = typeDefStore.findTypeVertexByGuidAndCategory(guid, TypeCategory.RELATIONSHIP); + + if (ret == null) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid); + } + + String typeName = AtlasGraphUtilsV2.getProperty(ret, Constants.TYPENAME_PROPERTY_KEY, String.class); + + if (AtlasGraphUtilsV2.relationshipTypeHasInstanceEdges(typeName)) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_HAS_REFERENCES, typeName); + } + + typeDefStore.deleteTypeVertexOutEdges(ret); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasRelationshipDefStoreV1.preDeleteByGuid({}): {}", guid, ret); + } + + return ret; + } + + private void updateVertexPreCreate(AtlasRelationshipDef relationshipDef, AtlasRelationshipType relationshipType, + AtlasVertex vertex) throws AtlasBaseException { + AtlasRelationshipEndDef end1 = relationshipDef.getEndDef1(); + AtlasRelationshipEndDef end2 = relationshipDef.getEndDef2(); + + // check whether the names added on the relationship Ends are reserved if required. + final boolean allowReservedKeywords; + try { + allowReservedKeywords = ApplicationProperties.get().getBoolean(ALLOW_RESERVED_KEYWORDS, true); + } catch (AtlasException e) { + throw new AtlasBaseException(e); + } + + if (!allowReservedKeywords) { + if (AtlasDSL.Parser.isKeyword(end1.getName())) { + throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_END1_NAME_INVALID, end1.getName()); + } + + if (AtlasDSL.Parser.isKeyword(end2.getName())) { + throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_END2_NAME_INVALID, end2.getName()); + } + } + + AtlasStructDefStoreV2.updateVertexPreCreate(relationshipDef, relationshipType, vertex, typeDefStore); + // Update ends + setVertexPropertiesFromRelationshipDef(relationshipDef, vertex); + } + + private void preUpdateCheck(AtlasRelationshipDef newRelationshipDef, AtlasRelationshipType relationshipType, + AtlasVertex vertex) throws AtlasBaseException { + // We will not support an update to endpoints or category key + AtlasRelationshipDef existingRelationshipDef = toRelationshipDef(vertex); + + preUpdateCheck(newRelationshipDef, existingRelationshipDef); + // we do allow change to tag propagation and the addition of new attributes. + + AtlasStructDefStoreV2.updateVertexPreUpdate(newRelationshipDef, relationshipType, vertex, typeDefStore); + + setVertexPropertiesFromRelationshipDef(newRelationshipDef, vertex); + } + + /** + * Check ends are the same and relationshipCategory is the same. + * + * We do this by comparing 2 relationshipDefs to avoid exposing the AtlasVertex to unit testing. + * + * @param newRelationshipDef + * @param existingRelationshipDef + * @throws AtlasBaseException + */ + public static void preUpdateCheck(AtlasRelationshipDef newRelationshipDef, AtlasRelationshipDef existingRelationshipDef) throws AtlasBaseException { + // do not allow renames of the Def. + String existingName = existingRelationshipDef.getName(); + String newName = newRelationshipDef.getName(); + + if (!existingName.equals(newName)) { + throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_INVALID_NAME_UPDATE, + newRelationshipDef.getGuid(),existingName, newName); + } + + RelationshipCategory existingRelationshipCategory = existingRelationshipDef.getRelationshipCategory(); + RelationshipCategory newRelationshipCategory = newRelationshipDef.getRelationshipCategory(); + + if ( !existingRelationshipCategory.equals(newRelationshipCategory)){ + throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_INVALID_CATEGORY_UPDATE, + newRelationshipDef.getName(),newRelationshipCategory.name(), + existingRelationshipCategory.name() ); + } + + AtlasRelationshipEndDef existingEnd1 = existingRelationshipDef.getEndDef1(); + AtlasRelationshipEndDef newEnd1 = newRelationshipDef.getEndDef1(); + + if ( !newEnd1.equals(existingEnd1) ) { + throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_INVALID_END1_UPDATE, + newRelationshipDef.getName(), newEnd1.toString(), existingEnd1.toString()); + } + + AtlasRelationshipEndDef existingEnd2 = existingRelationshipDef.getEndDef2(); + AtlasRelationshipEndDef newEnd2 = newRelationshipDef.getEndDef2(); + + if ( !newEnd2.equals(existingEnd2) ) { + throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_INVALID_END2_UPDATE, + newRelationshipDef.getName(), newEnd2.toString(), existingEnd2.toString()); + } + } + + public static void setVertexPropertiesFromRelationshipDef(AtlasRelationshipDef relationshipDef, AtlasVertex vertex) { + vertex.setProperty(Constants.RELATIONSHIPTYPE_END1_KEY, AtlasType.toJson(relationshipDef.getEndDef1())); + vertex.setProperty(Constants.RELATIONSHIPTYPE_END2_KEY, AtlasType.toJson(relationshipDef.getEndDef2())); + // default the relationship category to association if it has not been specified. + String relationshipCategory = RelationshipCategory.ASSOCIATION.name(); + if (relationshipDef.getRelationshipCategory()!=null) { + relationshipCategory =relationshipDef.getRelationshipCategory().name(); + } + // Update RelationshipCategory + vertex.setProperty(Constants.RELATIONSHIPTYPE_CATEGORY_KEY, relationshipCategory); + + if (relationshipDef.getPropagateTags() == null) { + vertex.setProperty(Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, AtlasRelationshipDef.PropagateTags.NONE.name()); + } else { + vertex.setProperty(Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, relationshipDef.getPropagateTags().name()); + } + } + + private AtlasRelationshipDef toRelationshipDef(AtlasVertex vertex) throws AtlasBaseException { + AtlasRelationshipDef ret = null; + + if (vertex != null && typeDefStore.isTypeVertex(vertex, TypeCategory.RELATIONSHIP)) { + String name = vertex.getProperty(Constants.TYPENAME_PROPERTY_KEY, String.class); + String description = vertex.getProperty(Constants.TYPEDESCRIPTION_PROPERTY_KEY, String.class); + String version = vertex.getProperty(Constants.TYPEVERSION_PROPERTY_KEY, String.class); + String end1Str = vertex.getProperty(Constants.RELATIONSHIPTYPE_END1_KEY, String.class); + String end2Str = vertex.getProperty(Constants.RELATIONSHIPTYPE_END2_KEY, String.class); + String relationStr = vertex.getProperty(Constants.RELATIONSHIPTYPE_CATEGORY_KEY, String.class); + String propagateStr = vertex.getProperty(Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, String.class); + + // set the ends + AtlasRelationshipEndDef endDef1 = AtlasType.fromJson(end1Str, AtlasRelationshipEndDef.class); + AtlasRelationshipEndDef endDef2 = AtlasType.fromJson(end2Str, AtlasRelationshipEndDef.class); + + // set the relationship Category + RelationshipCategory relationshipCategory = null; + for (RelationshipCategory value : RelationshipCategory.values()) { + if (value.name().equals(relationStr)) { + relationshipCategory = value; + } + } + + // set the propagateTags + PropagateTags propagateTags = null; + for (PropagateTags value : PropagateTags.values()) { + if (value.name().equals(propagateStr)) { + propagateTags = value; + } + } + + ret = new AtlasRelationshipDef(name, description, version, relationshipCategory, propagateTags, endDef1, endDef2); + + // add in the attributes + AtlasStructDefStoreV2.toStructDef(vertex, ret, typeDefStore); + } + + return ret; + } + +}
http://git-wip-us.apache.org/repos/asf/atlas/blob/3d5b4880/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java new file mode 100644 index 0000000..eb1079c --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java @@ -0,0 +1,837 @@ + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v2; + +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.annotation.GraphTransaction; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.TypeCategory; +import org.apache.atlas.model.instance.AtlasClassification; +import org.apache.atlas.model.instance.AtlasEntity.Status; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.model.instance.AtlasRelationship; +import org.apache.atlas.model.instance.AtlasRelationship.AtlasRelationshipWithExtInfo; +import org.apache.atlas.model.typedef.AtlasRelationshipDef; +import org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags; +import org.apache.atlas.model.typedef.AtlasRelationshipEndDef; +import org.apache.atlas.repository.Constants; +import org.apache.atlas.repository.RepositoryException; +import org.apache.atlas.repository.graph.GraphHelper; +import org.apache.atlas.repository.graphdb.AtlasEdge; +import org.apache.atlas.repository.graphdb.AtlasEdgeDirection; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.store.graph.AtlasRelationshipStore; +import org.apache.atlas.repository.store.graph.v1.DeleteHandlerV1; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasRelationshipType; +import org.apache.atlas.type.AtlasStructType.AtlasAttribute; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.type.AtlasTypeUtil; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import javax.inject.Inject; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.UUID; + +import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE; +import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED; +import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.ONE_TO_TWO; +import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.TWO_TO_ONE; +import static org.apache.atlas.repository.graph.GraphHelper.getBlockedClassificationIds; +import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEntityGuid; +import static org.apache.atlas.repository.graph.GraphHelper.getClassificationName; +import static org.apache.atlas.repository.graph.GraphHelper.getClassificationVertices; +import static org.apache.atlas.repository.graph.GraphHelper.getOutGoingEdgesByLabel; +import static org.apache.atlas.repository.graph.GraphHelper.getPropagateTags; +import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getIdFromVertex; +import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getState; +import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getTypeName; + +@Component +public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore { + private static final Logger LOG = LoggerFactory.getLogger(AtlasRelationshipStoreV2.class); + + private static final Long DEFAULT_RELATIONSHIP_VERSION = 0L; + + private final AtlasTypeRegistry typeRegistry; + private final EntityGraphRetriever entityRetriever; + private final DeleteHandlerV1 deleteHandler; + private final GraphHelper graphHelper = GraphHelper.getInstance(); + private final AtlasEntityChangeNotifier entityChangeNotifier; + + @Inject + public AtlasRelationshipStoreV2(AtlasTypeRegistry typeRegistry, DeleteHandlerV1 deleteHandler, AtlasEntityChangeNotifier entityChangeNotifier) { + this.typeRegistry = typeRegistry; + this.entityRetriever = new EntityGraphRetriever(typeRegistry); + this.deleteHandler = deleteHandler; + this.entityChangeNotifier = entityChangeNotifier; + } + + @Override + @GraphTransaction + public AtlasRelationship create(AtlasRelationship relationship) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> create({})", relationship); + } + + AtlasVertex end1Vertex = getVertexFromEndPoint(relationship.getEnd1()); + AtlasVertex end2Vertex = getVertexFromEndPoint(relationship.getEnd2()); + + validateRelationship(end1Vertex, end2Vertex, relationship.getTypeName(), relationship.getAttributes()); + + AtlasEdge edge = createRelationship(end1Vertex, end2Vertex, relationship); + + AtlasRelationship ret = edge != null ? entityRetriever.mapEdgeToAtlasRelationship(edge) : null; + + if (LOG.isDebugEnabled()) { + LOG.debug("<== create({}): {}", relationship, ret); + } + + // notify entities for added/removed classification propagation + entityChangeNotifier.notifyPropagatedEntities(); + + return ret; + } + + @Override + @GraphTransaction + public AtlasRelationship update(AtlasRelationship relationship) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> update({})", relationship); + } + + String guid = relationship.getGuid(); + + if (StringUtils.isEmpty(guid)) { + throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_GUID_NOT_FOUND, guid); + } + + AtlasEdge edge = graphHelper.getEdgeForGUID(guid); + String edgeType = AtlasGraphUtilsV2.getTypeName(edge); + AtlasVertex end1Vertex = edge.getOutVertex(); + AtlasVertex end2Vertex = edge.getInVertex(); + + // update shouldn't change endType + if (StringUtils.isNotEmpty(relationship.getTypeName()) && !StringUtils.equalsIgnoreCase(edgeType, relationship.getTypeName())) { + throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_UPDATE_TYPE_CHANGE_NOT_ALLOWED, guid, edgeType, relationship.getTypeName()); + } + + // update shouldn't change ends + if (relationship.getEnd1() != null) { + String updatedEnd1Guid = relationship.getEnd1().getGuid(); + + if (updatedEnd1Guid == null) { + AtlasVertex updatedEnd1Vertex = getVertexFromEndPoint(relationship.getEnd1()); + + updatedEnd1Guid = updatedEnd1Vertex == null ? null : AtlasGraphUtilsV2.getIdFromVertex(updatedEnd1Vertex); + } + + if (updatedEnd1Guid != null) { + String end1Guid = AtlasGraphUtilsV2.getIdFromVertex(end1Vertex); + + if (!StringUtils.equalsIgnoreCase(relationship.getEnd1().getGuid(), end1Guid)) { + throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_UPDATE_END_CHANGE_NOT_ALLOWED, edgeType, guid, end1Guid, relationship.getEnd1().getGuid()); + } + } + } + + // update shouldn't change ends + if (relationship.getEnd2() != null) { + String updatedEnd2Guid = relationship.getEnd2().getGuid(); + + if (updatedEnd2Guid == null) { + AtlasVertex updatedEnd2Vertex = getVertexFromEndPoint(relationship.getEnd2()); + + updatedEnd2Guid = updatedEnd2Vertex == null ? null : AtlasGraphUtilsV2.getIdFromVertex(updatedEnd2Vertex); + } + + if (updatedEnd2Guid != null) { + String end2Guid = AtlasGraphUtilsV2.getIdFromVertex(end2Vertex); + + if (!StringUtils.equalsIgnoreCase(relationship.getEnd2().getGuid(), end2Guid)) { + throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_UPDATE_END_CHANGE_NOT_ALLOWED, AtlasGraphUtilsV2.getTypeName(edge), guid, end2Guid, relationship.getEnd2().getGuid()); + } + } + } + + + validateRelationship(end1Vertex, end2Vertex, edgeType, relationship.getAttributes()); + + AtlasRelationship ret = updateRelationship(edge, relationship); + + // notify entities for added/removed classification propagation + entityChangeNotifier.notifyPropagatedEntities(); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== update({}): {}", relationship, ret); + } + + return ret; + } + + @Override + @GraphTransaction + public AtlasRelationship getById(String guid) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> getById({})", guid); + } + + AtlasEdge edge = graphHelper.getEdgeForGUID(guid); + AtlasRelationship ret = entityRetriever.mapEdgeToAtlasRelationship(edge); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== getById({}): {}", guid, ret); + } + + return ret; + } + + @Override + @GraphTransaction + public AtlasRelationshipWithExtInfo getExtInfoById(String guid) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> getExtInfoById({})", guid); + } + + AtlasEdge edge = graphHelper.getEdgeForGUID(guid); + AtlasRelationshipWithExtInfo ret = entityRetriever.mapEdgeToAtlasRelationshipWithExtInfo(edge); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== getExtInfoById({}): {}", guid, ret); + } + + return ret; + } + + @Override + @GraphTransaction + public void deleteById(String guid) throws AtlasBaseException { + deleteById(guid, false); + } + + @Override + @GraphTransaction + public void deleteById(String guid, boolean forceDelete) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> deleteById({}, {})", guid, forceDelete); + } + + if (StringUtils.isEmpty(guid)) { + throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_CRUD_INVALID_PARAMS, " empty/null guid"); + } + + AtlasEdge edge = graphHelper.getEdgeForGUID(guid); + + if (edge == null) { + throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_GUID_NOT_FOUND, guid); + } + + if (getState(edge) == DELETED) { + throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_ALREADY_DELETED, guid); + } + + deleteHandler.deleteRelationships(Collections.singleton(edge), forceDelete); + + // notify entities for added/removed classification propagation + entityChangeNotifier.notifyPropagatedEntities(); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== deleteById({}): {}", guid); + } + } + + @Override + public AtlasEdge getOrCreate(AtlasVertex end1Vertex, AtlasVertex end2Vertex, AtlasRelationship relationship) throws AtlasBaseException { + AtlasEdge ret = getRelationshipEdge(end1Vertex, end2Vertex, relationship.getTypeName()); + + if (ret == null) { + validateRelationship(end1Vertex, end2Vertex, relationship.getTypeName(), relationship.getAttributes()); + + ret = createRelationship(end1Vertex, end2Vertex, relationship); + } + + return ret; + } + + @Override + public AtlasRelationship getOrCreate(AtlasRelationship relationship) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> getOrCreate({})", relationship); + } + + validateRelationship(relationship); + + AtlasVertex end1Vertex = getVertexFromEndPoint(relationship.getEnd1()); + AtlasVertex end2Vertex = getVertexFromEndPoint(relationship.getEnd2()); + AtlasRelationship ret = null; + + // check if relationship exists + AtlasEdge relationshipEdge = getRelationshipEdge(end1Vertex, end2Vertex, relationship.getTypeName()); + + if (relationshipEdge == null) { + validateRelationship(relationship); + + relationshipEdge = createRelationship(end1Vertex, end2Vertex, relationship); + } + + if (relationshipEdge != null){ + ret = entityRetriever.mapEdgeToAtlasRelationship(relationshipEdge); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== getOrCreate({}): {}", relationship, ret); + } + + return ret; + } + + private AtlasEdge createRelationship(AtlasVertex end1Vertex, AtlasVertex end2Vertex, AtlasRelationship relationship) throws AtlasBaseException { + AtlasEdge ret = null; + + try { + ret = getRelationshipEdge(end1Vertex, end2Vertex, relationship.getTypeName()); + + if (ret == null) { + ret = createRelationshipEdge(end1Vertex, end2Vertex, relationship); + + AtlasRelationshipType relationType = typeRegistry.getRelationshipTypeByName(relationship.getTypeName()); + + if (MapUtils.isNotEmpty(relationType.getAllAttributes())) { + for (AtlasAttribute attr : relationType.getAllAttributes().values()) { + String attrName = attr.getName(); + String attrVertexProperty = attr.getVertexPropertyName(); + Object attrValue = relationship.getAttribute(attrName); + + AtlasGraphUtilsV2.setProperty(ret, attrVertexProperty, attrValue); + } + } + } else { + throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_ALREADY_EXISTS, relationship.getTypeName(), + AtlasGraphUtilsV2.getIdFromVertex(end1Vertex), AtlasGraphUtilsV2.getIdFromVertex(end2Vertex)); + } + } catch (RepositoryException e) { + throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, e); + } + + return ret; + } + + private AtlasRelationship updateRelationship(AtlasEdge relationshipEdge, AtlasRelationship relationship) throws AtlasBaseException { + AtlasRelationshipType relationType = typeRegistry.getRelationshipTypeByName(relationship.getTypeName()); + + updateTagPropagations(relationshipEdge, relationship); + + if (MapUtils.isNotEmpty(relationType.getAllAttributes())) { + for (AtlasAttribute attr : relationType.getAllAttributes().values()) { + String attrName = attr.getName(); + String attrVertexProperty = attr.getVertexPropertyName(); + + if (relationship.hasAttribute(attrName)) { + AtlasGraphUtilsV2.setProperty(relationshipEdge, attrVertexProperty, relationship.getAttribute(attrName)); + } + } + } + + return entityRetriever.mapEdgeToAtlasRelationship(relationshipEdge); + } + + private void handleBlockedClassifications(AtlasEdge edge, Set<AtlasClassification> blockedPropagatedClassifications) throws AtlasBaseException { + if (blockedPropagatedClassifications != null) { + List<AtlasVertex> propagatedClassificationVertices = getClassificationVertices(edge); + List<String> currentClassificationIds = getBlockedClassificationIds(edge); + List<AtlasVertex> currentBlockedPropagatedClassificationVertices = getBlockedClassificationVertices(propagatedClassificationVertices, currentClassificationIds); + List<AtlasVertex> updatedBlockedPropagatedClassificationVertices = new ArrayList<>(); + List<String> updatedClassificationIds = new ArrayList<>(); + + for (AtlasClassification classification : blockedPropagatedClassifications) { + AtlasVertex classificationVertex = validateBlockedPropagatedClassification(propagatedClassificationVertices, classification); + + // ignore invalid blocked propagated classification + if (classificationVertex == null) { + continue; + } + + updatedBlockedPropagatedClassificationVertices.add(classificationVertex); + + String classificationId = classificationVertex.getIdForDisplay(); + + updatedClassificationIds.add(classificationId); + } + + addToBlockedClassificationIds(edge, updatedClassificationIds); + + // remove propagated tag for added entry + List<AtlasVertex> addedBlockedClassifications = (List<AtlasVertex>) CollectionUtils.subtract(updatedBlockedPropagatedClassificationVertices, currentBlockedPropagatedClassificationVertices); + + for (AtlasVertex classificationVertex : addedBlockedClassifications) { + List<AtlasVertex> removePropagationFromVertices = graphHelper.getPropagatedEntityVertices(classificationVertex); + + deleteHandler.removeTagPropagation(classificationVertex, removePropagationFromVertices); + } + + // add propagated tag for removed entry + List<AtlasVertex> removedBlockedClassifications = (List<AtlasVertex>) CollectionUtils.subtract(currentBlockedPropagatedClassificationVertices, updatedBlockedPropagatedClassificationVertices); + + for (AtlasVertex classificationVertex : removedBlockedClassifications) { + List<AtlasVertex> addPropagationToVertices = graphHelper.getPropagatedEntityVertices(classificationVertex); + + deleteHandler.addTagPropagation(classificationVertex, addPropagationToVertices); + } + } + } + + private List<AtlasVertex> getBlockedClassificationVertices(List<AtlasVertex> classificationVertices, List<String> blockedClassificationIds) { + List<AtlasVertex> ret = new ArrayList<>(); + + if (CollectionUtils.isNotEmpty(blockedClassificationIds)) { + for (AtlasVertex classificationVertex : classificationVertices) { + String classificationId = classificationVertex.getIdForDisplay(); + + if (blockedClassificationIds.contains(classificationId)) { + ret.add(classificationVertex); + } + } + } + + return ret; + } + + // propagated classifications should contain blocked propagated classification + private AtlasVertex validateBlockedPropagatedClassification(List<AtlasVertex> classificationVertices, AtlasClassification classification) { + AtlasVertex ret = null; + + for (AtlasVertex vertex : classificationVertices) { + String classificationName = getClassificationName(vertex); + String entityGuid = getClassificationEntityGuid(vertex); + + if (classificationName.equals(classification.getTypeName()) && entityGuid.equals(classification.getEntityGuid())) { + ret = vertex; + break; + } + } + + return ret; + } + + private void addToBlockedClassificationIds(AtlasEdge edge, List<String> classificationIds) { + if (edge != null) { + if (classificationIds.isEmpty()) { + edge.removeProperty(Constants.RELATIONSHIPTYPE_BLOCKED_PROPAGATED_CLASSIFICATIONS_KEY); + } else { + edge.setListProperty(Constants.RELATIONSHIPTYPE_BLOCKED_PROPAGATED_CLASSIFICATIONS_KEY, classificationIds); + } + } + } + + private void updateTagPropagations(AtlasEdge edge, AtlasRelationship relationship) throws AtlasBaseException { + PropagateTags oldTagPropagation = getPropagateTags(edge); + PropagateTags newTagPropagation = relationship.getPropagateTags(); + + if (newTagPropagation != oldTagPropagation) { + List<AtlasVertex> currentClassificationVertices = getClassificationVertices(edge); + Map<AtlasVertex, List<AtlasVertex>> currentClassificationsMap = graphHelper.getClassificationPropagatedEntitiesMapping(currentClassificationVertices); + + // Update propagation edge + AtlasGraphUtilsV2.setProperty(edge, Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, newTagPropagation.name()); + + List<AtlasVertex> updatedClassificationVertices = getClassificationVertices(edge); + List<AtlasVertex> classificationVerticesUnion = (List<AtlasVertex>) CollectionUtils.union(currentClassificationVertices, updatedClassificationVertices); + Map<AtlasVertex, List<AtlasVertex>> updatedClassificationsMap = graphHelper.getClassificationPropagatedEntitiesMapping(classificationVerticesUnion); + + // compute add/remove propagations list + Map<AtlasVertex, List<AtlasVertex>> addPropagationsMap = new HashMap<>(); + Map<AtlasVertex, List<AtlasVertex>> removePropagationsMap = new HashMap<>(); + + if (MapUtils.isEmpty(currentClassificationsMap) && MapUtils.isNotEmpty(updatedClassificationsMap)) { + addPropagationsMap.putAll(updatedClassificationsMap); + + } else if (MapUtils.isNotEmpty(currentClassificationsMap) && MapUtils.isEmpty(updatedClassificationsMap)) { + removePropagationsMap.putAll(currentClassificationsMap); + + } else { + for (AtlasVertex classificationVertex : updatedClassificationsMap.keySet()) { + List<AtlasVertex> currentPropagatingEntities = currentClassificationsMap.containsKey(classificationVertex) ? currentClassificationsMap.get(classificationVertex) : Collections.emptyList(); + List<AtlasVertex> updatedPropagatingEntities = updatedClassificationsMap.containsKey(classificationVertex) ? updatedClassificationsMap.get(classificationVertex) : Collections.emptyList(); + + List<AtlasVertex> entitiesAdded = (List<AtlasVertex>) CollectionUtils.subtract(updatedPropagatingEntities, currentPropagatingEntities); + List<AtlasVertex> entitiesRemoved = (List<AtlasVertex>) CollectionUtils.subtract(currentPropagatingEntities, updatedPropagatingEntities); + + if (CollectionUtils.isNotEmpty(entitiesAdded)) { + addPropagationsMap.put(classificationVertex, entitiesAdded); + } + + if (CollectionUtils.isNotEmpty(entitiesRemoved)) { + removePropagationsMap.put(classificationVertex, entitiesRemoved); + } + } + } + + for (AtlasVertex classificationVertex : addPropagationsMap.keySet()) { + deleteHandler.addTagPropagation(classificationVertex, addPropagationsMap.get(classificationVertex)); + } + + for (AtlasVertex classificationVertex : removePropagationsMap.keySet()) { + deleteHandler.removeTagPropagation(classificationVertex, removePropagationsMap.get(classificationVertex)); + } + } else { + // update blocked propagated classifications only if there is no change is tag propagation (don't update both) + handleBlockedClassifications(edge, relationship.getBlockedPropagatedClassifications()); + } + } + + private void validateRelationship(AtlasRelationship relationship) throws AtlasBaseException { + if (relationship == null) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "AtlasRelationship is null"); + } + + String relationshipName = relationship.getTypeName(); + String end1TypeName = getTypeNameFromObjectId(relationship.getEnd1()); + String end2TypeName = getTypeNameFromObjectId(relationship.getEnd2()); + AtlasRelationshipType relationshipType = typeRegistry.getRelationshipTypeByName(relationshipName); + + if (relationshipType == null) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_VALUE, "unknown relationship type'" + relationshipName + "'"); + } + + if (relationship.getEnd1() == null || relationship.getEnd2() == null) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "end1/end2 is null"); + } + + boolean validEndTypes = false; + + if (relationshipType.getEnd1Type().isTypeOrSuperTypeOf(end1TypeName)) { + validEndTypes = relationshipType.getEnd2Type().isTypeOrSuperTypeOf(end2TypeName); + } else if (relationshipType.getEnd2Type().isTypeOrSuperTypeOf(end1TypeName)) { + validEndTypes = relationshipType.getEnd1Type().isTypeOrSuperTypeOf(end2TypeName); + } + + if (!validEndTypes) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_RELATIONSHIP_END_TYPE, relationshipName, relationshipType.getEnd2Type().getTypeName(), end1TypeName); + } + + validateEnds(relationship); + + validateAndNormalize(relationship); + } + + private void validateRelationship(AtlasVertex end1Vertex, AtlasVertex end2Vertex, String relationshipName, Map<String, Object> attributes) throws AtlasBaseException { + AtlasRelationshipType relationshipType = typeRegistry.getRelationshipTypeByName(relationshipName); + + if (relationshipType == null) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_VALUE, "unknown relationship type'" + relationshipName + "'"); + } + + if (end1Vertex == null) { + throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_END_IS_NULL, relationshipType.getEnd1Type().getTypeName()); + } + + if (end2Vertex == null) { + throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_END_IS_NULL, relationshipType.getEnd2Type().getTypeName()); + } + + String end1TypeName = AtlasGraphUtilsV2.getTypeName(end1Vertex); + String end2TypeName = AtlasGraphUtilsV2.getTypeName(end2Vertex); + + boolean validEndTypes = false; + + if (relationshipType.getEnd1Type().isTypeOrSuperTypeOf(end1TypeName)) { + validEndTypes = relationshipType.getEnd2Type().isTypeOrSuperTypeOf(end2TypeName); + } else if (relationshipType.getEnd2Type().isTypeOrSuperTypeOf(end1TypeName)) { + validEndTypes = relationshipType.getEnd1Type().isTypeOrSuperTypeOf(end2TypeName); + } + + if (!validEndTypes) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_RELATIONSHIP_END_TYPE, relationshipName, relationshipType.getEnd2Type().getTypeName(), end1TypeName); + } + + List<String> messages = new ArrayList<>(); + AtlasRelationship relationship = new AtlasRelationship(relationshipName, attributes); + + relationshipType.validateValue(relationship, relationshipName, messages); + + if (!messages.isEmpty()) { + throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_CRUD_INVALID_PARAMS, messages); + } + + relationshipType.getNormalizedValue(relationship); + } + + + /** + * Validate the ends of the passed relationship + * @param relationship + * @throws AtlasBaseException + */ + private void validateEnds(AtlasRelationship relationship) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("validateEnds entry relationship:" + relationship); + } + List<AtlasObjectId> ends = new ArrayList<>(); + List<AtlasRelationshipEndDef> endDefs = new ArrayList<>(); + String relationshipTypeName = relationship.getTypeName(); + AtlasRelationshipDef relationshipDef = typeRegistry.getRelationshipDefByName(relationshipTypeName); + + ends.add(relationship.getEnd1()); + ends.add(relationship.getEnd2()); + endDefs.add(relationshipDef.getEndDef1()); + endDefs.add(relationshipDef.getEndDef2()); + + for (int i = 0; i < ends.size(); i++) { + AtlasObjectId end = ends.get(i); + String guid = end.getGuid(); + String typeName = end.getTypeName(); + Map<String, Object> uniqueAttributes = end.getUniqueAttributes(); + AtlasVertex endVertex = AtlasGraphUtilsV2.findByGuid(guid); + + if (!AtlasTypeUtil.isValidGuid(guid) || endVertex == null) { + throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid); + + } else if (MapUtils.isNotEmpty(uniqueAttributes)) { + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName); + + if (AtlasGraphUtilsV2.findByUniqueAttributes(entityType, uniqueAttributes) == null) { + throw new AtlasBaseException(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND, typeName, uniqueAttributes.toString()); + } + } else { + // check whether the guid is the correct type + String vertexTypeName = endVertex.getProperty(Constants.TYPE_NAME_PROPERTY_KEY, String.class); + + if (!Objects.equals(vertexTypeName, typeName)) { + String attrName = endDefs.get(i).getName(); + + throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_INVALID_ENDTYPE, attrName, guid, vertexTypeName, typeName); + } + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("validateEnds exit successfully validated relationship:" + relationship); + } + } + + private void validateAndNormalize(AtlasRelationship relationship) throws AtlasBaseException { + List<String> messages = new ArrayList<>(); + + if (! AtlasTypeUtil.isValidGuid(relationship.getGuid())) { + throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_GUID_NOT_FOUND, relationship.getGuid()); + } + + AtlasRelationshipType type = typeRegistry.getRelationshipTypeByName(relationship.getTypeName()); + + if (type == null) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, TypeCategory.RELATIONSHIP.name(), relationship.getTypeName()); + } + + type.validateValue(relationship, relationship.getTypeName(), messages); + + if (!messages.isEmpty()) { + throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_CRUD_INVALID_PARAMS, messages); + } + + type.getNormalizedValue(relationship); + } + + public AtlasEdge getRelationshipEdge(AtlasVertex fromVertex, AtlasVertex toVertex, String relationshipType) { + String relationshipLabel = getRelationshipEdgeLabel(fromVertex, toVertex, relationshipType); + Iterator<AtlasEdge> edgesIterator = getOutGoingEdgesByLabel(fromVertex, relationshipLabel); + AtlasEdge ret = null; + + while (edgesIterator != null && edgesIterator.hasNext()) { + AtlasEdge edge = edgesIterator.next(); + + if (edge != null) { + Status status = graphHelper.getStatus(edge); + + if ((status == null || status == ACTIVE) && + StringUtils.equals(getIdFromVertex(edge.getInVertex()), getIdFromVertex(toVertex))) { + ret = edge; + break; + } + } + } + + return ret; + } + + private Long getRelationshipVersion(AtlasRelationship relationship) { + Long ret = relationship != null ? relationship.getVersion() : null; + + return (ret != null) ? ret : DEFAULT_RELATIONSHIP_VERSION; + } + + private AtlasVertex getVertexFromEndPoint(AtlasObjectId endPoint) { + AtlasVertex ret = null; + + if (StringUtils.isNotEmpty(endPoint.getGuid())) { + ret = AtlasGraphUtilsV2.findByGuid(endPoint.getGuid()); + } else if (StringUtils.isNotEmpty(endPoint.getTypeName()) && MapUtils.isNotEmpty(endPoint.getUniqueAttributes())) { + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(endPoint.getTypeName()); + + ret = AtlasGraphUtilsV2.findByUniqueAttributes(entityType, endPoint.getUniqueAttributes()); + } + + return ret; + } + + private AtlasEdge createRelationshipEdge(AtlasVertex fromVertex, AtlasVertex toVertex, AtlasRelationship relationship) throws RepositoryException, AtlasBaseException { + String relationshipLabel = getRelationshipEdgeLabel(fromVertex, toVertex, relationship.getTypeName()); + PropagateTags tagPropagation = getRelationshipTagPropagation(fromVertex, toVertex, relationship); + AtlasEdge ret = graphHelper.getOrCreateEdge(fromVertex, toVertex, relationshipLabel); + + if (LOG.isDebugEnabled()) { + LOG.debug("Created relationship edge from [{}] --> [{}] using edge label: [{}]", getTypeName(fromVertex), getTypeName(toVertex), relationshipLabel); + } + + // map additional properties to relationship edge + if (ret != null) { + final String guid = UUID.randomUUID().toString(); + + AtlasGraphUtilsV2.setProperty(ret, Constants.ENTITY_TYPE_PROPERTY_KEY, relationship.getTypeName()); + AtlasGraphUtilsV2.setProperty(ret, Constants.RELATIONSHIP_GUID_PROPERTY_KEY, guid); + AtlasGraphUtilsV2.setProperty(ret, Constants.VERSION_PROPERTY_KEY, getRelationshipVersion(relationship)); + AtlasGraphUtilsV2.setProperty(ret, Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, tagPropagation.name()); + + // blocked propagated classifications + handleBlockedClassifications(ret, relationship.getBlockedPropagatedClassifications()); + + // propagate tags + deleteHandler.addTagPropagation(ret, tagPropagation); + } + + return ret; + } + + private PropagateTags getRelationshipTagPropagation(AtlasVertex fromVertex, AtlasVertex toVertex, AtlasRelationship relationship) { + AtlasRelationshipType relationshipType = typeRegistry.getRelationshipTypeByName(relationship.getTypeName()); + AtlasRelationshipEndDef endDef1 = relationshipType.getRelationshipDef().getEndDef1(); + AtlasRelationshipEndDef endDef2 = relationshipType.getRelationshipDef().getEndDef2(); + Set<String> fromVertexTypes = getTypeAndAllSuperTypes(getTypeName(fromVertex)); + Set<String> toVertexTypes = getTypeAndAllSuperTypes(getTypeName(toVertex)); + PropagateTags ret = relationshipType.getRelationshipDef().getPropagateTags(); + + // relationshipDef is defined as end1 (hive_db) and end2 (hive_table) and tagPropagation = ONE_TO_TWO + // relationship edge exists from [hive_table --> hive_db] + // swap the tagPropagation property for such cases. + if (fromVertexTypes.contains(endDef2.getType()) && toVertexTypes.contains(endDef1.getType())) { + if (ret == ONE_TO_TWO) { + ret = TWO_TO_ONE; + } else if (ret == TWO_TO_ONE) { + ret = ONE_TO_TWO; + } + } + + return ret; + } + + private String getRelationshipEdgeLabel(AtlasVertex fromVertex, AtlasVertex toVertex, String relationshipTypeName) { + if (LOG.isDebugEnabled()) { + LOG.debug("getRelationshipEdgeLabel({})", relationshipTypeName); + } + + AtlasRelationshipType relationshipType = typeRegistry.getRelationshipTypeByName(relationshipTypeName); + String ret = relationshipType.getRelationshipDef().getRelationshipLabel(); + AtlasRelationshipEndDef endDef1 = relationshipType.getRelationshipDef().getEndDef1(); + AtlasRelationshipEndDef endDef2 = relationshipType.getRelationshipDef().getEndDef2(); + Set<String> fromVertexTypes = getTypeAndAllSuperTypes(AtlasGraphUtilsV2.getTypeName(fromVertex)); + Set<String> toVertexTypes = getTypeAndAllSuperTypes(AtlasGraphUtilsV2.getTypeName(toVertex)); + AtlasAttribute attribute = null; + + // validate entity type and all its supertypes contains relationshipDefs end type + // e.g. [hive_process -> hive_table] -> [Process -> DataSet] + if (fromVertexTypes.contains(endDef1.getType()) && toVertexTypes.contains(endDef2.getType())) { + String attributeName = endDef1.getName(); + + attribute = relationshipType.getEnd1Type().getRelationshipAttribute(attributeName); + + } else if (fromVertexTypes.contains(endDef2.getType()) && toVertexTypes.contains(endDef1.getType())) { + String attributeName = endDef2.getName(); + + attribute = relationshipType.getEnd2Type().getRelationshipAttribute(attributeName); + } + + if (attribute != null) { + ret = attribute.getRelationshipEdgeLabel(); + } + + return ret; + } + + public Set<String> getTypeAndAllSuperTypes(String entityTypeName) { + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityTypeName); + + return (entityType != null) ? entityType.getTypeAndAllSuperTypes() : new HashSet<String>(); + } + + private String getTypeNameFromObjectId(AtlasObjectId objectId) { + String typeName = objectId.getTypeName(); + + if (StringUtils.isBlank(typeName)) { + typeName = AtlasGraphUtilsV2.getTypeNameFromGuid(objectId.getGuid()); + } + + return typeName; + } + + /** + * Check whether this vertex has a relationship associated with this relationship type. + * @param vertex + * @param relationshipTypeName + * @return true if found an edge with this relationship type in. + */ + private boolean vertexHasRelationshipWithType(AtlasVertex vertex, String relationshipTypeName) { + String relationshipEdgeLabel = getRelationshipEdgeLabel(getTypeName(vertex), relationshipTypeName); + Iterator<AtlasEdge> iter = graphHelper.getAdjacentEdgesByLabel(vertex, AtlasEdgeDirection.BOTH, relationshipEdgeLabel); + + return (iter != null) ? iter.hasNext() : false; + } + + private String getRelationshipEdgeLabel(String typeName, String relationshipTypeName) { + AtlasRelationshipType relationshipType = typeRegistry.getRelationshipTypeByName(relationshipTypeName); + AtlasRelationshipDef relationshipDef = relationshipType.getRelationshipDef(); + AtlasEntityType end1Type = relationshipType.getEnd1Type(); + AtlasEntityType end2Type = relationshipType.getEnd2Type(); + Set<String> vertexTypes = getTypeAndAllSuperTypes(typeName); + AtlasAttribute attribute = null; + + if (vertexTypes.contains(end1Type.getTypeName())) { + String attributeName = relationshipDef.getEndDef1().getName(); + + attribute = (attributeName != null) ? end1Type.getAttribute(attributeName) : null; + } else if (vertexTypes.contains(end2Type.getTypeName())) { + String attributeName = relationshipDef.getEndDef2().getName(); + + attribute = (attributeName != null) ? end2Type.getAttribute(attributeName) : null; + } + + return (attribute != null) ? attribute.getRelationshipEdgeLabel() : null; + } +} \ No newline at end of file