http://git-wip-us.apache.org/repos/asf/atlas/blob/3d5b4880/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipDefStoreV2.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipDefStoreV2.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipDefStoreV2.java
new file mode 100644
index 0000000..bc03aeb
--- /dev/null
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipDefStoreV2.java
@@ -0,0 +1,508 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.typedef.AtlasRelationshipDef;
+import 
org.apache.atlas.model.typedef.AtlasRelationshipDef.RelationshipCategory;
+import org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags;
+import org.apache.atlas.model.typedef.AtlasRelationshipEndDef;
+import org.apache.atlas.query.AtlasDSL;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.graphdb.AtlasEdge;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.type.AtlasRelationshipType;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.typesystem.types.DataTypes.TypeCategory;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.atlas.authorize.AtlasPrivilege;
+import org.apache.atlas.authorize.AtlasTypeAccessRequest;
+import org.apache.atlas.authorize.AtlasAuthorizationUtils;
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * RelationshipDef store in v1 format.
+ */
+public class AtlasRelationshipDefStoreV2 extends 
AtlasAbstractDefStoreV2<AtlasRelationshipDef> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AtlasRelationshipDefStoreV2.class);
+
+    @Inject
+    public AtlasRelationshipDefStoreV2(AtlasTypeDefGraphStoreV2 typeDefStore, 
AtlasTypeRegistry typeRegistry) {
+        super(typeDefStore, typeRegistry);
+    }
+
+    @Override
+    public AtlasVertex preCreate(AtlasRelationshipDef relationshipDef) throws 
AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AtlasRelationshipDefStoreV1.preCreate({})", 
relationshipDef);
+        }
+
+        validateType(relationshipDef);
+
+        AtlasType type = typeRegistry.getType(relationshipDef.getName());
+
+        if (type.getTypeCategory() != 
org.apache.atlas.model.TypeCategory.RELATIONSHIP) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_MATCH_FAILED, 
relationshipDef.getName(), TypeCategory.RELATIONSHIP.name());
+        }
+
+        AtlasVertex relationshipDefVertex = 
typeDefStore.findTypeVertexByName(relationshipDef.getName());
+
+        if (relationshipDefVertex != null) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_ALREADY_EXISTS, 
relationshipDef.getName());
+        }
+
+        relationshipDefVertex = typeDefStore.createTypeVertex(relationshipDef);
+
+        updateVertexPreCreate(relationshipDef, (AtlasRelationshipType) type, 
relationshipDefVertex);
+
+        final AtlasRelationshipEndDef endDef1        = 
relationshipDef.getEndDef1();
+        final AtlasRelationshipEndDef endDef2        = 
relationshipDef.getEndDef2();
+        final String                  type1          = endDef1.getType();
+        final String                  type2          = endDef2.getType();
+        final String                  name1          = endDef1.getName();
+        final String                  name2          = endDef2.getName();
+        final AtlasVertex             end1TypeVertex = 
typeDefStore.findTypeVertexByName(type1);
+        final AtlasVertex             end2TypeVertex = 
typeDefStore.findTypeVertexByName(type2);
+
+        if (end1TypeVertex == null) {
+            throw new 
AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_END_TYPE_NAME_NOT_FOUND, 
relationshipDef.getName(), type1);
+        }
+
+        if (end2TypeVertex == null) {
+            throw new 
AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_END_TYPE_NAME_NOT_FOUND, 
relationshipDef.getName(), type2);
+        }
+
+        // create an edge between the relationshipDef and each of the 
entityDef vertices.
+        AtlasEdge edge1 = typeDefStore.getOrCreateEdge(relationshipDefVertex, 
end1TypeVertex, AtlasGraphUtilsV2.RELATIONSHIPTYPE_EDGE_LABEL);
+
+        /*
+        Where edge1 and edge2 have the same names and types we do not need a 
second edge.
+        We are not invoking the equals method on the AtlasRelationshipedDef, 
as we only want 1 edge even if propagateTags or other properties are different.
+        */
+
+        if (type1.equals(type2) && name1.equals(name2)) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("AtlasRelationshipDefStoreV1.preCreate({}): created 
relationshipDef vertex {}," +
+                        " and one edge as {}, because end1 and end2 have the 
same type and name", relationshipDef, relationshipDefVertex, edge1);
+            }
+
+        } else {
+            AtlasEdge edge2 = 
typeDefStore.getOrCreateEdge(relationshipDefVertex, end2TypeVertex, 
AtlasGraphUtilsV2.RELATIONSHIPTYPE_EDGE_LABEL);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("AtlasRelationshipDefStoreV1.preCreate({}): created 
relationshipDef vertex {}," +
+                        " edge1 as {}, edge2 as {} ", relationshipDef, 
relationshipDefVertex, edge1, edge2);
+            }
+
+        }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== AtlasRelationshipDefStoreV1.preCreate({}): {}", 
relationshipDef, relationshipDefVertex);
+        }
+        return relationshipDefVertex;
+    }
+
+    @Override
+    public AtlasRelationshipDef create(AtlasRelationshipDef relationshipDef, 
AtlasVertex preCreateResult)
+            throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AtlasRelationshipDefStoreV1.create({}, {})", 
relationshipDef, preCreateResult);
+        }
+
+        AtlasAuthorizationUtils.verifyAccess(new 
AtlasTypeAccessRequest(AtlasPrivilege.TYPE_CREATE, relationshipDef), "create 
relationship-def ", relationshipDef.getName());
+
+        AtlasVertex vertex = (preCreateResult == null) ? 
preCreate(relationshipDef) : preCreateResult;
+
+        AtlasRelationshipDef ret = toRelationshipDef(vertex);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== AtlasRelationshipDefStoreV1.create({}, {}): {}", 
relationshipDef, preCreateResult, ret);
+        }
+
+        return ret;
+    }
+
+    @Override
+    public List<AtlasRelationshipDef> getAll() throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AtlasRelationshipDefStoreV1.getAll()");
+        }
+
+        List<AtlasRelationshipDef> ret = new ArrayList<>();
+        Iterator<AtlasVertex> vertices = 
typeDefStore.findTypeVerticesByCategory(TypeCategory.RELATIONSHIP);
+
+        while (vertices.hasNext()) {
+            ret.add(toRelationshipDef(vertices.next()));
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== AtlasRelationshipDefStoreV1.getAll(): count={}", 
ret.size());
+        }
+
+        return ret;
+    }
+
+    @Override
+    public AtlasRelationshipDef getByName(String name) throws 
AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AtlasRelationshipDefStoreV1.getByName({})", name);
+        }
+
+        AtlasVertex vertex = 
typeDefStore.findTypeVertexByNameAndCategory(name, TypeCategory.RELATIONSHIP);
+
+        if (vertex == null) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, 
name);
+        }
+
+        vertex.getProperty(Constants.TYPE_CATEGORY_PROPERTY_KEY, 
TypeCategory.class);
+
+        AtlasRelationshipDef ret = toRelationshipDef(vertex);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== AtlasRelationshipDefStoreV1.getByName({}): {}", 
name, ret);
+        }
+
+        return ret;
+    }
+
+    @Override
+    public AtlasRelationshipDef getByGuid(String guid) throws 
AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AtlasRelationshipDefStoreV1.getByGuid({})", guid);
+        }
+
+        AtlasVertex vertex = 
typeDefStore.findTypeVertexByGuidAndCategory(guid, TypeCategory.RELATIONSHIP);
+
+        if (vertex == null) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, 
guid);
+        }
+
+        AtlasRelationshipDef ret = toRelationshipDef(vertex);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== AtlasRelationshipDefStoreV1.getByGuid({}): {}", 
guid, ret);
+        }
+
+        return ret;
+    }
+
+    @Override
+    public AtlasRelationshipDef update(AtlasRelationshipDef relationshipDef) 
throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AtlasRelationshipDefStoreV1.update({})", 
relationshipDef);
+        }
+
+        validateType(relationshipDef);
+
+        AtlasRelationshipDef ret = 
StringUtils.isNotBlank(relationshipDef.getGuid())
+                ? updateByGuid(relationshipDef.getGuid(), relationshipDef)
+                : updateByName(relationshipDef.getName(), relationshipDef);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== AtlasRelationshipDefStoreV1.update({}): {}", 
relationshipDef, ret);
+        }
+
+        return ret;
+    }
+
+    @Override
+    public AtlasRelationshipDef updateByName(String name, AtlasRelationshipDef 
relationshipDef)
+            throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AtlasRelationshipDefStoreV1.updateByName({}, {})", 
name, relationshipDef);
+        }
+
+        AtlasRelationshipDef existingDef = 
typeRegistry.getRelationshipDefByName(name);
+
+        AtlasAuthorizationUtils.verifyAccess(new 
AtlasTypeAccessRequest(AtlasPrivilege.TYPE_UPDATE, existingDef), "update 
relationship-def ", name);
+
+        validateType(relationshipDef);
+
+        AtlasType type = typeRegistry.getType(relationshipDef.getName());
+
+        if (type.getTypeCategory() != 
org.apache.atlas.model.TypeCategory.RELATIONSHIP) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_MATCH_FAILED, 
relationshipDef.getName(), TypeCategory.RELATIONSHIP.name());
+        }
+
+        AtlasVertex vertex = 
typeDefStore.findTypeVertexByNameAndCategory(name, TypeCategory.RELATIONSHIP);
+
+        if (vertex == null) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, 
name);
+        }
+
+        preUpdateCheck(relationshipDef, (AtlasRelationshipType) type, vertex);
+
+        AtlasRelationshipDef ret = toRelationshipDef(vertex);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== AtlasRelationshipDefStoreV1.updateByName({}, {}): 
{}", name, relationshipDef, ret);
+        }
+
+        return ret;
+    }
+
+    @Override
+    public AtlasRelationshipDef updateByGuid(String guid, AtlasRelationshipDef 
relationshipDef)
+            throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AtlasRelationshipDefStoreV1.updateByGuid({})", 
guid);
+        }
+
+        AtlasRelationshipDef existingDef = 
typeRegistry.getRelationshipDefByGuid(guid);
+
+        AtlasAuthorizationUtils.verifyAccess(new 
AtlasTypeAccessRequest(AtlasPrivilege.TYPE_UPDATE, existingDef), "update 
relationship-Def ", (existingDef != null ? existingDef.getName() : guid));
+
+        validateType(relationshipDef);
+
+        AtlasType type = typeRegistry.getTypeByGuid(guid);
+
+        if (type.getTypeCategory() != 
org.apache.atlas.model.TypeCategory.RELATIONSHIP) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_MATCH_FAILED, 
relationshipDef.getName(), TypeCategory.RELATIONSHIP.name());
+        }
+
+        AtlasVertex vertex = 
typeDefStore.findTypeVertexByGuidAndCategory(guid, TypeCategory.RELATIONSHIP);
+
+        if (vertex == null) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, 
guid);
+        }
+
+        preUpdateCheck(relationshipDef, (AtlasRelationshipType) type, vertex);
+        // updates should not effect the edges between the types as we do not 
allow updates that change the endpoints.
+
+        AtlasRelationshipDef ret = toRelationshipDef(vertex);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== AtlasRelationshipDefStoreV1.updateByGuid({}): {}", 
guid, ret);
+        }
+
+        return ret;
+    }
+
+    @Override
+    public AtlasVertex preDeleteByName(String name) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AtlasRelationshipDefStoreV1.preDeleteByName({})", 
name);
+        }
+
+        AtlasRelationshipDef existingDef = 
typeRegistry.getRelationshipDefByName(name);
+
+        AtlasAuthorizationUtils.verifyAccess(new 
AtlasTypeAccessRequest(AtlasPrivilege.TYPE_DELETE, existingDef), "delete 
relationship-def ", name);
+
+        AtlasVertex ret = typeDefStore.findTypeVertexByNameAndCategory(name, 
TypeCategory.RELATIONSHIP);
+
+        if (ret == null) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, 
name);
+        }
+
+        if (AtlasGraphUtilsV2.relationshipTypeHasInstanceEdges(name)) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_HAS_REFERENCES, 
name);
+        }
+
+        typeDefStore.deleteTypeVertexOutEdges(ret);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== AtlasRelationshipDefStoreV1.preDeleteByName({}): 
{}", name, ret);
+        }
+
+        return ret;
+    }
+
+    @Override
+    public AtlasVertex preDeleteByGuid(String guid) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AtlasRelationshipDefStoreV1.preDeleteByGuid({})", 
guid);
+        }
+
+        AtlasRelationshipDef existingDef = 
typeRegistry.getRelationshipDefByGuid(guid);
+
+        AtlasAuthorizationUtils.verifyAccess(new 
AtlasTypeAccessRequest(AtlasPrivilege.TYPE_DELETE, existingDef), "delete 
relationship-def ", (existingDef != null ? existingDef.getName() : guid));
+
+        AtlasVertex ret = typeDefStore.findTypeVertexByGuidAndCategory(guid, 
TypeCategory.RELATIONSHIP);
+
+        if (ret == null) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, 
guid);
+        }
+
+        String typeName = AtlasGraphUtilsV2.getProperty(ret, 
Constants.TYPENAME_PROPERTY_KEY, String.class);
+
+        if (AtlasGraphUtilsV2.relationshipTypeHasInstanceEdges(typeName)) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_HAS_REFERENCES, 
typeName);
+        }
+
+        typeDefStore.deleteTypeVertexOutEdges(ret);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== AtlasRelationshipDefStoreV1.preDeleteByGuid({}): 
{}", guid, ret);
+        }
+
+        return ret;
+    }
+
+    private void updateVertexPreCreate(AtlasRelationshipDef relationshipDef, 
AtlasRelationshipType relationshipType,
+                                       AtlasVertex vertex) throws 
AtlasBaseException {
+        AtlasRelationshipEndDef end1 = relationshipDef.getEndDef1();
+        AtlasRelationshipEndDef end2 = relationshipDef.getEndDef2();
+
+        // check whether the names added on the relationship Ends are reserved 
if required.
+        final boolean allowReservedKeywords;
+        try {
+            allowReservedKeywords = 
ApplicationProperties.get().getBoolean(ALLOW_RESERVED_KEYWORDS, true);
+        } catch (AtlasException e) {
+            throw new AtlasBaseException(e);
+        }
+
+        if (!allowReservedKeywords) {
+            if (AtlasDSL.Parser.isKeyword(end1.getName())) {
+                throw new 
AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_END1_NAME_INVALID, 
end1.getName());
+            }
+
+            if (AtlasDSL.Parser.isKeyword(end2.getName())) {
+                throw new 
AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_END2_NAME_INVALID, 
end2.getName());
+            }
+        }
+
+        AtlasStructDefStoreV2.updateVertexPreCreate(relationshipDef, 
relationshipType, vertex, typeDefStore);
+        // Update ends
+        setVertexPropertiesFromRelationshipDef(relationshipDef, vertex);
+    }
+
+    private void preUpdateCheck(AtlasRelationshipDef newRelationshipDef, 
AtlasRelationshipType relationshipType,
+                                AtlasVertex vertex) throws AtlasBaseException {
+        // We will not support an update to endpoints or category key
+        AtlasRelationshipDef existingRelationshipDef = 
toRelationshipDef(vertex);
+
+        preUpdateCheck(newRelationshipDef, existingRelationshipDef);
+        // we do allow change to tag propagation and the addition of new 
attributes.
+
+        AtlasStructDefStoreV2.updateVertexPreUpdate(newRelationshipDef, 
relationshipType, vertex, typeDefStore);
+
+        setVertexPropertiesFromRelationshipDef(newRelationshipDef, vertex);
+    }
+
+    /**
+     * Check ends are the same and relationshipCategory is the same.
+     *
+     * We do this by comparing 2 relationshipDefs to avoid exposing the 
AtlasVertex to unit testing.
+     *
+     * @param newRelationshipDef
+     * @param existingRelationshipDef
+     * @throws AtlasBaseException
+     */
+    public static void preUpdateCheck(AtlasRelationshipDef newRelationshipDef, 
AtlasRelationshipDef existingRelationshipDef) throws AtlasBaseException {
+        // do not allow renames of the Def.
+        String existingName = existingRelationshipDef.getName();
+        String newName      = newRelationshipDef.getName();
+
+        if (!existingName.equals(newName)) {
+            throw new 
AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_INVALID_NAME_UPDATE,
+                    newRelationshipDef.getGuid(),existingName, newName);
+        }
+
+        RelationshipCategory existingRelationshipCategory = 
existingRelationshipDef.getRelationshipCategory();
+        RelationshipCategory newRelationshipCategory      = 
newRelationshipDef.getRelationshipCategory();
+
+        if ( !existingRelationshipCategory.equals(newRelationshipCategory)){
+            throw new 
AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_INVALID_CATEGORY_UPDATE,
+                    
newRelationshipDef.getName(),newRelationshipCategory.name(),
+                    existingRelationshipCategory.name() );
+        }
+
+        AtlasRelationshipEndDef existingEnd1 = 
existingRelationshipDef.getEndDef1();
+        AtlasRelationshipEndDef newEnd1      = newRelationshipDef.getEndDef1();
+
+        if ( !newEnd1.equals(existingEnd1) ) {
+            throw new 
AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_INVALID_END1_UPDATE,
+                                         newRelationshipDef.getName(), 
newEnd1.toString(), existingEnd1.toString());
+        }
+
+        AtlasRelationshipEndDef existingEnd2 = 
existingRelationshipDef.getEndDef2();
+        AtlasRelationshipEndDef newEnd2      = newRelationshipDef.getEndDef2();
+
+        if ( !newEnd2.equals(existingEnd2) ) {
+                throw new 
AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_INVALID_END2_UPDATE,
+                                         newRelationshipDef.getName(), 
newEnd2.toString(), existingEnd2.toString());
+        }
+    }
+
+    public static void 
setVertexPropertiesFromRelationshipDef(AtlasRelationshipDef relationshipDef, 
AtlasVertex vertex) {
+        vertex.setProperty(Constants.RELATIONSHIPTYPE_END1_KEY, 
AtlasType.toJson(relationshipDef.getEndDef1()));
+        vertex.setProperty(Constants.RELATIONSHIPTYPE_END2_KEY, 
AtlasType.toJson(relationshipDef.getEndDef2()));
+        // default the relationship category to association if it has not been 
specified.
+        String relationshipCategory = RelationshipCategory.ASSOCIATION.name();
+        if (relationshipDef.getRelationshipCategory()!=null) {
+            relationshipCategory 
=relationshipDef.getRelationshipCategory().name();
+        }
+        // Update RelationshipCategory
+        vertex.setProperty(Constants.RELATIONSHIPTYPE_CATEGORY_KEY, 
relationshipCategory);
+
+        if (relationshipDef.getPropagateTags() == null) {
+            vertex.setProperty(Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, 
AtlasRelationshipDef.PropagateTags.NONE.name());
+        } else {
+            vertex.setProperty(Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, 
relationshipDef.getPropagateTags().name());
+        }
+    }
+
+    private AtlasRelationshipDef toRelationshipDef(AtlasVertex vertex) throws 
AtlasBaseException {
+        AtlasRelationshipDef ret = null;
+
+        if (vertex != null && typeDefStore.isTypeVertex(vertex, 
TypeCategory.RELATIONSHIP)) {
+            String name         = 
vertex.getProperty(Constants.TYPENAME_PROPERTY_KEY, String.class);
+            String description  = 
vertex.getProperty(Constants.TYPEDESCRIPTION_PROPERTY_KEY, String.class);
+            String version      = 
vertex.getProperty(Constants.TYPEVERSION_PROPERTY_KEY, String.class);
+            String end1Str = 
vertex.getProperty(Constants.RELATIONSHIPTYPE_END1_KEY, String.class);
+            String end2Str = 
vertex.getProperty(Constants.RELATIONSHIPTYPE_END2_KEY, String.class);
+            String relationStr  = 
vertex.getProperty(Constants.RELATIONSHIPTYPE_CATEGORY_KEY, String.class);
+            String propagateStr = 
vertex.getProperty(Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, 
String.class);
+
+            // set the ends
+            AtlasRelationshipEndDef endDef1 = AtlasType.fromJson(end1Str, 
AtlasRelationshipEndDef.class);
+            AtlasRelationshipEndDef endDef2 = AtlasType.fromJson(end2Str, 
AtlasRelationshipEndDef.class);
+
+            // set the relationship Category
+            RelationshipCategory relationshipCategory = null;
+            for (RelationshipCategory value : RelationshipCategory.values()) {
+                if (value.name().equals(relationStr)) {
+                    relationshipCategory = value;
+                }
+            }
+
+            // set the propagateTags
+            PropagateTags propagateTags = null;
+            for (PropagateTags value : PropagateTags.values()) {
+                if (value.name().equals(propagateStr)) {
+                    propagateTags = value;
+                }
+            }
+
+            ret = new AtlasRelationshipDef(name, description, version, 
relationshipCategory,  propagateTags, endDef1, endDef2);
+
+            // add in the attributes
+            AtlasStructDefStoreV2.toStructDef(vertex, ret, typeDefStore);
+        }
+
+        return ret;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/3d5b4880/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java
new file mode 100644
index 0000000..eb1079c
--- /dev/null
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java
@@ -0,0 +1,837 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2;
+
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.annotation.GraphTransaction;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.TypeCategory;
+import org.apache.atlas.model.instance.AtlasClassification;
+import org.apache.atlas.model.instance.AtlasEntity.Status;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.AtlasRelationship;
+import 
org.apache.atlas.model.instance.AtlasRelationship.AtlasRelationshipWithExtInfo;
+import org.apache.atlas.model.typedef.AtlasRelationshipDef;
+import org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags;
+import org.apache.atlas.model.typedef.AtlasRelationshipEndDef;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.RepositoryException;
+import org.apache.atlas.repository.graph.GraphHelper;
+import org.apache.atlas.repository.graphdb.AtlasEdge;
+import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
+import org.apache.atlas.repository.store.graph.v1.DeleteHandlerV1;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasRelationshipType;
+import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.type.AtlasTypeUtil;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE;
+import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED;
+import static 
org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.ONE_TO_TWO;
+import static 
org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.TWO_TO_ONE;
+import static 
org.apache.atlas.repository.graph.GraphHelper.getBlockedClassificationIds;
+import static 
org.apache.atlas.repository.graph.GraphHelper.getClassificationEntityGuid;
+import static 
org.apache.atlas.repository.graph.GraphHelper.getClassificationName;
+import static 
org.apache.atlas.repository.graph.GraphHelper.getClassificationVertices;
+import static 
org.apache.atlas.repository.graph.GraphHelper.getOutGoingEdgesByLabel;
+import static org.apache.atlas.repository.graph.GraphHelper.getPropagateTags;
+import static 
org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getIdFromVertex;
+import static 
org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getState;
+import static 
org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getTypeName;
+
+@Component
+public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AtlasRelationshipStoreV2.class);
+
+    private static final Long DEFAULT_RELATIONSHIP_VERSION = 0L;
+
+    private final AtlasTypeRegistry         typeRegistry;
+    private final EntityGraphRetriever      entityRetriever;
+    private final DeleteHandlerV1 deleteHandler;
+    private final GraphHelper               graphHelper = 
GraphHelper.getInstance();
+    private final AtlasEntityChangeNotifier entityChangeNotifier;
+
+    @Inject
+    public AtlasRelationshipStoreV2(AtlasTypeRegistry typeRegistry, 
DeleteHandlerV1 deleteHandler, AtlasEntityChangeNotifier entityChangeNotifier) {
+        this.typeRegistry         = typeRegistry;
+        this.entityRetriever      = new EntityGraphRetriever(typeRegistry);
+        this.deleteHandler        = deleteHandler;
+        this.entityChangeNotifier = entityChangeNotifier;
+    }
+
+    @Override
+    @GraphTransaction
+    public AtlasRelationship create(AtlasRelationship relationship) throws 
AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> create({})", relationship);
+        }
+
+        AtlasVertex end1Vertex = getVertexFromEndPoint(relationship.getEnd1());
+        AtlasVertex end2Vertex = getVertexFromEndPoint(relationship.getEnd2());
+
+        validateRelationship(end1Vertex, end2Vertex, 
relationship.getTypeName(), relationship.getAttributes());
+
+        AtlasEdge edge = createRelationship(end1Vertex, end2Vertex, 
relationship);
+
+        AtlasRelationship ret = edge != null ? 
entityRetriever.mapEdgeToAtlasRelationship(edge) : null;
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== create({}): {}", relationship, ret);
+        }
+
+        // notify entities for added/removed classification propagation
+        entityChangeNotifier.notifyPropagatedEntities();
+
+        return ret;
+    }
+
+    @Override
+    @GraphTransaction
+    public AtlasRelationship update(AtlasRelationship relationship) throws 
AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> update({})", relationship);
+        }
+
+        String guid = relationship.getGuid();
+
+        if (StringUtils.isEmpty(guid)) {
+            throw new 
AtlasBaseException(AtlasErrorCode.RELATIONSHIP_GUID_NOT_FOUND, guid);
+        }
+
+        AtlasEdge   edge       = graphHelper.getEdgeForGUID(guid);
+        String      edgeType   = AtlasGraphUtilsV2.getTypeName(edge);
+        AtlasVertex end1Vertex = edge.getOutVertex();
+        AtlasVertex end2Vertex = edge.getInVertex();
+
+        // update shouldn't change endType
+        if (StringUtils.isNotEmpty(relationship.getTypeName()) && 
!StringUtils.equalsIgnoreCase(edgeType, relationship.getTypeName())) {
+            throw new 
AtlasBaseException(AtlasErrorCode.RELATIONSHIP_UPDATE_TYPE_CHANGE_NOT_ALLOWED, 
guid, edgeType, relationship.getTypeName());
+        }
+
+        // update shouldn't change ends
+        if (relationship.getEnd1() != null) {
+            String updatedEnd1Guid = relationship.getEnd1().getGuid();
+
+            if (updatedEnd1Guid == null) {
+                AtlasVertex updatedEnd1Vertex = 
getVertexFromEndPoint(relationship.getEnd1());
+
+                updatedEnd1Guid = updatedEnd1Vertex == null ? null : 
AtlasGraphUtilsV2.getIdFromVertex(updatedEnd1Vertex);
+            }
+
+            if (updatedEnd1Guid != null) {
+                String end1Guid = 
AtlasGraphUtilsV2.getIdFromVertex(end1Vertex);
+
+                if 
(!StringUtils.equalsIgnoreCase(relationship.getEnd1().getGuid(), end1Guid)) {
+                    throw new 
AtlasBaseException(AtlasErrorCode.RELATIONSHIP_UPDATE_END_CHANGE_NOT_ALLOWED, 
edgeType, guid, end1Guid, relationship.getEnd1().getGuid());
+                }
+            }
+        }
+
+        // update shouldn't change ends
+        if (relationship.getEnd2() != null) {
+            String updatedEnd2Guid = relationship.getEnd2().getGuid();
+
+            if (updatedEnd2Guid == null) {
+                AtlasVertex updatedEnd2Vertex = 
getVertexFromEndPoint(relationship.getEnd2());
+
+                updatedEnd2Guid = updatedEnd2Vertex == null ? null : 
AtlasGraphUtilsV2.getIdFromVertex(updatedEnd2Vertex);
+            }
+
+            if (updatedEnd2Guid != null) {
+                String end2Guid = 
AtlasGraphUtilsV2.getIdFromVertex(end2Vertex);
+
+                if 
(!StringUtils.equalsIgnoreCase(relationship.getEnd2().getGuid(), end2Guid)) {
+                    throw new 
AtlasBaseException(AtlasErrorCode.RELATIONSHIP_UPDATE_END_CHANGE_NOT_ALLOWED, 
AtlasGraphUtilsV2.getTypeName(edge), guid, end2Guid, 
relationship.getEnd2().getGuid());
+                }
+            }
+        }
+
+
+        validateRelationship(end1Vertex, end2Vertex, edgeType, 
relationship.getAttributes());
+
+        AtlasRelationship ret = updateRelationship(edge, relationship);
+
+        // notify entities for added/removed classification propagation
+        entityChangeNotifier.notifyPropagatedEntities();
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== update({}): {}", relationship, ret);
+        }
+
+        return ret;
+    }
+
+    @Override
+    @GraphTransaction
+    public AtlasRelationship getById(String guid) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> getById({})", guid);
+        }
+
+        AtlasEdge         edge = graphHelper.getEdgeForGUID(guid);
+        AtlasRelationship ret  = 
entityRetriever.mapEdgeToAtlasRelationship(edge);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== getById({}): {}", guid, ret);
+        }
+
+        return ret;
+    }
+
+    @Override
+    @GraphTransaction
+    public AtlasRelationshipWithExtInfo getExtInfoById(String guid) throws 
AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> getExtInfoById({})", guid);
+        }
+
+        AtlasEdge                    edge = graphHelper.getEdgeForGUID(guid);
+        AtlasRelationshipWithExtInfo ret  = 
entityRetriever.mapEdgeToAtlasRelationshipWithExtInfo(edge);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== getExtInfoById({}): {}", guid, ret);
+        }
+
+        return ret;
+    }
+
+    @Override
+    @GraphTransaction
+    public void deleteById(String guid) throws AtlasBaseException {
+        deleteById(guid, false);
+    }
+
+    @Override
+    @GraphTransaction
+    public void deleteById(String guid, boolean forceDelete) throws 
AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> deleteById({}, {})", guid, forceDelete);
+        }
+
+        if (StringUtils.isEmpty(guid)) {
+            throw new 
AtlasBaseException(AtlasErrorCode.RELATIONSHIP_CRUD_INVALID_PARAMS, " 
empty/null guid");
+        }
+
+        AtlasEdge edge = graphHelper.getEdgeForGUID(guid);
+
+        if (edge == null) {
+            throw new 
AtlasBaseException(AtlasErrorCode.RELATIONSHIP_GUID_NOT_FOUND, guid);
+        }
+
+        if (getState(edge) == DELETED) {
+            throw new 
AtlasBaseException(AtlasErrorCode.RELATIONSHIP_ALREADY_DELETED, guid);
+        }
+
+        deleteHandler.deleteRelationships(Collections.singleton(edge), 
forceDelete);
+
+        // notify entities for added/removed classification propagation
+        entityChangeNotifier.notifyPropagatedEntities();
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== deleteById({}): {}", guid);
+        }
+    }
+
+    @Override
+    public AtlasEdge getOrCreate(AtlasVertex end1Vertex, AtlasVertex 
end2Vertex, AtlasRelationship relationship) throws AtlasBaseException {
+        AtlasEdge ret = getRelationshipEdge(end1Vertex, end2Vertex, 
relationship.getTypeName());
+
+        if (ret == null) {
+            validateRelationship(end1Vertex, end2Vertex, 
relationship.getTypeName(), relationship.getAttributes());
+
+            ret = createRelationship(end1Vertex, end2Vertex, relationship);
+        }
+
+        return ret;
+    }
+
+    @Override
+    public AtlasRelationship getOrCreate(AtlasRelationship relationship) 
throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> getOrCreate({})", relationship);
+        }
+
+        validateRelationship(relationship);
+
+        AtlasVertex       end1Vertex = 
getVertexFromEndPoint(relationship.getEnd1());
+        AtlasVertex       end2Vertex = 
getVertexFromEndPoint(relationship.getEnd2());
+        AtlasRelationship ret        = null;
+
+        // check if relationship exists
+        AtlasEdge relationshipEdge = getRelationshipEdge(end1Vertex, 
end2Vertex, relationship.getTypeName());
+
+        if (relationshipEdge == null) {
+            validateRelationship(relationship);
+
+            relationshipEdge = createRelationship(end1Vertex, end2Vertex, 
relationship);
+        }
+
+        if (relationshipEdge != null){
+            ret = entityRetriever.mapEdgeToAtlasRelationship(relationshipEdge);
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== getOrCreate({}): {}", relationship, ret);
+        }
+
+        return ret;
+    }
+
+    private AtlasEdge createRelationship(AtlasVertex end1Vertex, AtlasVertex 
end2Vertex, AtlasRelationship relationship) throws AtlasBaseException {
+        AtlasEdge ret = null;
+
+        try {
+            ret = getRelationshipEdge(end1Vertex, end2Vertex, 
relationship.getTypeName());
+
+            if (ret == null) {
+                ret = createRelationshipEdge(end1Vertex, end2Vertex, 
relationship);
+
+                AtlasRelationshipType relationType = 
typeRegistry.getRelationshipTypeByName(relationship.getTypeName());
+
+                if (MapUtils.isNotEmpty(relationType.getAllAttributes())) {
+                    for (AtlasAttribute attr : 
relationType.getAllAttributes().values()) {
+                        String attrName           = attr.getName();
+                        String attrVertexProperty = 
attr.getVertexPropertyName();
+                        Object attrValue          = 
relationship.getAttribute(attrName);
+
+                        AtlasGraphUtilsV2.setProperty(ret, attrVertexProperty, 
attrValue);
+                    }
+                }
+            } else {
+                throw new 
AtlasBaseException(AtlasErrorCode.RELATIONSHIP_ALREADY_EXISTS, 
relationship.getTypeName(),
+                                             
AtlasGraphUtilsV2.getIdFromVertex(end1Vertex), 
AtlasGraphUtilsV2.getIdFromVertex(end2Vertex));
+            }
+        } catch (RepositoryException e) {
+            throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, e);
+        }
+
+        return ret;
+    }
+
+    private AtlasRelationship updateRelationship(AtlasEdge relationshipEdge, 
AtlasRelationship relationship) throws AtlasBaseException {
+        AtlasRelationshipType relationType = 
typeRegistry.getRelationshipTypeByName(relationship.getTypeName());
+
+        updateTagPropagations(relationshipEdge, relationship);
+
+        if (MapUtils.isNotEmpty(relationType.getAllAttributes())) {
+            for (AtlasAttribute attr : 
relationType.getAllAttributes().values()) {
+                String attrName           = attr.getName();
+                String attrVertexProperty = attr.getVertexPropertyName();
+
+                if (relationship.hasAttribute(attrName)) {
+                    AtlasGraphUtilsV2.setProperty(relationshipEdge, 
attrVertexProperty, relationship.getAttribute(attrName));
+                }
+            }
+        }
+
+        return entityRetriever.mapEdgeToAtlasRelationship(relationshipEdge);
+    }
+
+    private void handleBlockedClassifications(AtlasEdge edge, 
Set<AtlasClassification> blockedPropagatedClassifications) throws 
AtlasBaseException {
+        if (blockedPropagatedClassifications != null) {
+            List<AtlasVertex> propagatedClassificationVertices               = 
getClassificationVertices(edge);
+            List<String>      currentClassificationIds                       = 
getBlockedClassificationIds(edge);
+            List<AtlasVertex> currentBlockedPropagatedClassificationVertices = 
getBlockedClassificationVertices(propagatedClassificationVertices, 
currentClassificationIds);
+            List<AtlasVertex> updatedBlockedPropagatedClassificationVertices = 
new ArrayList<>();
+            List<String>      updatedClassificationIds                       = 
new ArrayList<>();
+
+            for (AtlasClassification classification : 
blockedPropagatedClassifications) {
+                AtlasVertex classificationVertex = 
validateBlockedPropagatedClassification(propagatedClassificationVertices, 
classification);
+
+                // ignore invalid blocked propagated classification
+                if (classificationVertex == null) {
+                    continue;
+                }
+
+                
updatedBlockedPropagatedClassificationVertices.add(classificationVertex);
+
+                String classificationId = 
classificationVertex.getIdForDisplay();
+
+                updatedClassificationIds.add(classificationId);
+            }
+
+            addToBlockedClassificationIds(edge, updatedClassificationIds);
+
+            // remove propagated tag for added entry
+            List<AtlasVertex> addedBlockedClassifications = 
(List<AtlasVertex>) 
CollectionUtils.subtract(updatedBlockedPropagatedClassificationVertices, 
currentBlockedPropagatedClassificationVertices);
+
+            for (AtlasVertex classificationVertex : 
addedBlockedClassifications) {
+                List<AtlasVertex> removePropagationFromVertices = 
graphHelper.getPropagatedEntityVertices(classificationVertex);
+
+                deleteHandler.removeTagPropagation(classificationVertex, 
removePropagationFromVertices);
+            }
+
+            // add propagated tag for removed entry
+            List<AtlasVertex> removedBlockedClassifications = 
(List<AtlasVertex>) 
CollectionUtils.subtract(currentBlockedPropagatedClassificationVertices, 
updatedBlockedPropagatedClassificationVertices);
+
+            for (AtlasVertex classificationVertex : 
removedBlockedClassifications) {
+                List<AtlasVertex> addPropagationToVertices = 
graphHelper.getPropagatedEntityVertices(classificationVertex);
+
+                deleteHandler.addTagPropagation(classificationVertex, 
addPropagationToVertices);
+            }
+        }
+    }
+
+    private List<AtlasVertex> 
getBlockedClassificationVertices(List<AtlasVertex> classificationVertices, 
List<String> blockedClassificationIds) {
+        List<AtlasVertex> ret = new ArrayList<>();
+
+        if (CollectionUtils.isNotEmpty(blockedClassificationIds)) {
+            for (AtlasVertex classificationVertex : classificationVertices) {
+                String classificationId = 
classificationVertex.getIdForDisplay();
+
+                if (blockedClassificationIds.contains(classificationId)) {
+                    ret.add(classificationVertex);
+                }
+            }
+        }
+
+        return ret;
+    }
+
+    // propagated classifications should contain blocked propagated 
classification
+    private AtlasVertex 
validateBlockedPropagatedClassification(List<AtlasVertex> 
classificationVertices, AtlasClassification classification) {
+        AtlasVertex ret = null;
+
+        for (AtlasVertex vertex : classificationVertices) {
+            String classificationName = getClassificationName(vertex);
+            String entityGuid         = getClassificationEntityGuid(vertex);
+
+            if (classificationName.equals(classification.getTypeName()) && 
entityGuid.equals(classification.getEntityGuid())) {
+                ret = vertex;
+                break;
+            }
+        }
+
+        return ret;
+    }
+
+    private void addToBlockedClassificationIds(AtlasEdge edge, List<String> 
classificationIds) {
+        if (edge != null) {
+            if (classificationIds.isEmpty()) {
+                
edge.removeProperty(Constants.RELATIONSHIPTYPE_BLOCKED_PROPAGATED_CLASSIFICATIONS_KEY);
+            } else {
+                
edge.setListProperty(Constants.RELATIONSHIPTYPE_BLOCKED_PROPAGATED_CLASSIFICATIONS_KEY,
 classificationIds);
+            }
+        }
+    }
+
+    private void updateTagPropagations(AtlasEdge edge, AtlasRelationship 
relationship) throws AtlasBaseException {
+        PropagateTags oldTagPropagation = getPropagateTags(edge);
+        PropagateTags newTagPropagation = relationship.getPropagateTags();
+
+        if (newTagPropagation != oldTagPropagation) {
+            List<AtlasVertex>                   currentClassificationVertices 
= getClassificationVertices(edge);
+            Map<AtlasVertex, List<AtlasVertex>> currentClassificationsMap     
= 
graphHelper.getClassificationPropagatedEntitiesMapping(currentClassificationVertices);
+
+            // Update propagation edge
+            AtlasGraphUtilsV2.setProperty(edge, 
Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, newTagPropagation.name());
+
+            List<AtlasVertex>                   updatedClassificationVertices 
= getClassificationVertices(edge);
+            List<AtlasVertex>                   classificationVerticesUnion   
= (List<AtlasVertex>) CollectionUtils.union(currentClassificationVertices, 
updatedClassificationVertices);
+            Map<AtlasVertex, List<AtlasVertex>> updatedClassificationsMap     
= 
graphHelper.getClassificationPropagatedEntitiesMapping(classificationVerticesUnion);
+
+            // compute add/remove propagations list
+            Map<AtlasVertex, List<AtlasVertex>> addPropagationsMap    = new 
HashMap<>();
+            Map<AtlasVertex, List<AtlasVertex>> removePropagationsMap = new 
HashMap<>();
+
+            if (MapUtils.isEmpty(currentClassificationsMap) && 
MapUtils.isNotEmpty(updatedClassificationsMap)) {
+                addPropagationsMap.putAll(updatedClassificationsMap);
+
+            } else if (MapUtils.isNotEmpty(currentClassificationsMap) && 
MapUtils.isEmpty(updatedClassificationsMap)) {
+                removePropagationsMap.putAll(currentClassificationsMap);
+
+            } else {
+                for (AtlasVertex classificationVertex : 
updatedClassificationsMap.keySet()) {
+                    List<AtlasVertex> currentPropagatingEntities = 
currentClassificationsMap.containsKey(classificationVertex) ? 
currentClassificationsMap.get(classificationVertex) : Collections.emptyList();
+                    List<AtlasVertex> updatedPropagatingEntities = 
updatedClassificationsMap.containsKey(classificationVertex) ? 
updatedClassificationsMap.get(classificationVertex) : Collections.emptyList();
+
+                    List<AtlasVertex> entitiesAdded   = (List<AtlasVertex>) 
CollectionUtils.subtract(updatedPropagatingEntities, 
currentPropagatingEntities);
+                    List<AtlasVertex> entitiesRemoved = (List<AtlasVertex>) 
CollectionUtils.subtract(currentPropagatingEntities, 
updatedPropagatingEntities);
+
+                    if (CollectionUtils.isNotEmpty(entitiesAdded)) {
+                        addPropagationsMap.put(classificationVertex, 
entitiesAdded);
+                    }
+
+                    if (CollectionUtils.isNotEmpty(entitiesRemoved)) {
+                        removePropagationsMap.put(classificationVertex, 
entitiesRemoved);
+                    }
+                }
+            }
+
+            for (AtlasVertex classificationVertex : 
addPropagationsMap.keySet()) {
+                deleteHandler.addTagPropagation(classificationVertex, 
addPropagationsMap.get(classificationVertex));
+            }
+
+            for (AtlasVertex classificationVertex : 
removePropagationsMap.keySet()) {
+                deleteHandler.removeTagPropagation(classificationVertex, 
removePropagationsMap.get(classificationVertex));
+            }
+        } else {
+            // update blocked propagated classifications only if there is no 
change is tag propagation (don't update both)
+            handleBlockedClassifications(edge, 
relationship.getBlockedPropagatedClassifications());
+        }
+    }
+
+    private void validateRelationship(AtlasRelationship relationship) throws 
AtlasBaseException {
+        if (relationship == null) {
+            throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, 
"AtlasRelationship is null");
+        }
+
+        String                relationshipName = relationship.getTypeName();
+        String                end1TypeName     = 
getTypeNameFromObjectId(relationship.getEnd1());
+        String                end2TypeName     = 
getTypeNameFromObjectId(relationship.getEnd2());
+        AtlasRelationshipType relationshipType = 
typeRegistry.getRelationshipTypeByName(relationshipName);
+
+        if (relationshipType == null) {
+            throw new AtlasBaseException(AtlasErrorCode.INVALID_VALUE, 
"unknown relationship type'" + relationshipName + "'");
+        }
+
+        if (relationship.getEnd1() == null || relationship.getEnd2() == null) {
+            throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, 
"end1/end2 is null");
+        }
+
+        boolean validEndTypes = false;
+
+        if (relationshipType.getEnd1Type().isTypeOrSuperTypeOf(end1TypeName)) {
+            validEndTypes = 
relationshipType.getEnd2Type().isTypeOrSuperTypeOf(end2TypeName);
+        } else if 
(relationshipType.getEnd2Type().isTypeOrSuperTypeOf(end1TypeName)) {
+            validEndTypes = 
relationshipType.getEnd1Type().isTypeOrSuperTypeOf(end2TypeName);
+        }
+
+        if (!validEndTypes) {
+            throw new 
AtlasBaseException(AtlasErrorCode.INVALID_RELATIONSHIP_END_TYPE, 
relationshipName, relationshipType.getEnd2Type().getTypeName(), end1TypeName);
+        }
+
+        validateEnds(relationship);
+
+        validateAndNormalize(relationship);
+    }
+
+    private void validateRelationship(AtlasVertex end1Vertex, AtlasVertex 
end2Vertex, String relationshipName, Map<String, Object> attributes) throws 
AtlasBaseException {
+        AtlasRelationshipType relationshipType = 
typeRegistry.getRelationshipTypeByName(relationshipName);
+
+        if (relationshipType == null) {
+            throw new AtlasBaseException(AtlasErrorCode.INVALID_VALUE, 
"unknown relationship type'" + relationshipName + "'");
+        }
+
+        if (end1Vertex == null) {
+            throw new 
AtlasBaseException(AtlasErrorCode.RELATIONSHIP_END_IS_NULL, 
relationshipType.getEnd1Type().getTypeName());
+        }
+
+        if (end2Vertex == null) {
+            throw new 
AtlasBaseException(AtlasErrorCode.RELATIONSHIP_END_IS_NULL, 
relationshipType.getEnd2Type().getTypeName());
+        }
+
+        String                end1TypeName     = 
AtlasGraphUtilsV2.getTypeName(end1Vertex);
+        String                end2TypeName     = 
AtlasGraphUtilsV2.getTypeName(end2Vertex);
+
+        boolean validEndTypes = false;
+
+        if (relationshipType.getEnd1Type().isTypeOrSuperTypeOf(end1TypeName)) {
+            validEndTypes = 
relationshipType.getEnd2Type().isTypeOrSuperTypeOf(end2TypeName);
+        } else if 
(relationshipType.getEnd2Type().isTypeOrSuperTypeOf(end1TypeName)) {
+            validEndTypes = 
relationshipType.getEnd1Type().isTypeOrSuperTypeOf(end2TypeName);
+        }
+
+        if (!validEndTypes) {
+            throw new 
AtlasBaseException(AtlasErrorCode.INVALID_RELATIONSHIP_END_TYPE, 
relationshipName, relationshipType.getEnd2Type().getTypeName(), end1TypeName);
+        }
+
+        List<String>      messages     = new ArrayList<>();
+        AtlasRelationship relationship = new 
AtlasRelationship(relationshipName, attributes);
+
+        relationshipType.validateValue(relationship, relationshipName, 
messages);
+
+        if (!messages.isEmpty()) {
+            throw new 
AtlasBaseException(AtlasErrorCode.RELATIONSHIP_CRUD_INVALID_PARAMS, messages);
+        }
+
+        relationshipType.getNormalizedValue(relationship);
+    }
+
+
+    /**
+     * Validate the ends of the passed relationship
+     * @param relationship
+     * @throws AtlasBaseException
+     */
+    private void validateEnds(AtlasRelationship relationship) throws 
AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("validateEnds entry relationship:" + relationship);
+        }
+        List<AtlasObjectId>           ends                 = new ArrayList<>();
+        List<AtlasRelationshipEndDef> endDefs              = new ArrayList<>();
+        String                        relationshipTypeName = 
relationship.getTypeName();
+        AtlasRelationshipDef          relationshipDef      = 
typeRegistry.getRelationshipDefByName(relationshipTypeName);
+
+        ends.add(relationship.getEnd1());
+        ends.add(relationship.getEnd2());
+        endDefs.add(relationshipDef.getEndDef1());
+        endDefs.add(relationshipDef.getEndDef2());
+
+        for (int i = 0; i < ends.size(); i++) {
+            AtlasObjectId       end              = ends.get(i);
+            String              guid             = end.getGuid();
+            String              typeName         = end.getTypeName();
+            Map<String, Object> uniqueAttributes = end.getUniqueAttributes();
+            AtlasVertex         endVertex        = 
AtlasGraphUtilsV2.findByGuid(guid);
+
+            if (!AtlasTypeUtil.isValidGuid(guid) || endVertex == null) {
+                throw new 
AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
+
+            } else if (MapUtils.isNotEmpty(uniqueAttributes)) {
+                AtlasEntityType entityType = 
typeRegistry.getEntityTypeByName(typeName);
+
+                if (AtlasGraphUtilsV2.findByUniqueAttributes(entityType, 
uniqueAttributes) == null) {
+                    throw new 
AtlasBaseException(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND, 
typeName, uniqueAttributes.toString());
+                }
+            } else {
+                // check whether the guid is the correct type
+                String vertexTypeName = 
endVertex.getProperty(Constants.TYPE_NAME_PROPERTY_KEY, String.class);
+
+                if (!Objects.equals(vertexTypeName, typeName)) {
+                    String attrName = endDefs.get(i).getName();
+
+                    throw new 
AtlasBaseException(AtlasErrorCode.RELATIONSHIP_INVALID_ENDTYPE, attrName, guid, 
vertexTypeName, typeName);
+                }
+            }
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("validateEnds exit successfully validated relationship:" 
+ relationship);
+        }
+    }
+
+    private void validateAndNormalize(AtlasRelationship relationship) throws 
AtlasBaseException {
+        List<String> messages = new ArrayList<>();
+
+        if (! AtlasTypeUtil.isValidGuid(relationship.getGuid())) {
+            throw new 
AtlasBaseException(AtlasErrorCode.RELATIONSHIP_GUID_NOT_FOUND, 
relationship.getGuid());
+        }
+
+        AtlasRelationshipType type = 
typeRegistry.getRelationshipTypeByName(relationship.getTypeName());
+
+        if (type == null) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, 
TypeCategory.RELATIONSHIP.name(), relationship.getTypeName());
+        }
+
+        type.validateValue(relationship, relationship.getTypeName(), messages);
+
+        if (!messages.isEmpty()) {
+            throw new 
AtlasBaseException(AtlasErrorCode.RELATIONSHIP_CRUD_INVALID_PARAMS, messages);
+        }
+
+        type.getNormalizedValue(relationship);
+    }
+
+    public AtlasEdge getRelationshipEdge(AtlasVertex fromVertex, AtlasVertex 
toVertex, String relationshipType) {
+        String              relationshipLabel = 
getRelationshipEdgeLabel(fromVertex, toVertex, relationshipType);
+        Iterator<AtlasEdge> edgesIterator     = 
getOutGoingEdgesByLabel(fromVertex, relationshipLabel);
+        AtlasEdge           ret               = null;
+
+        while (edgesIterator != null && edgesIterator.hasNext()) {
+            AtlasEdge edge = edgesIterator.next();
+
+            if (edge != null) {
+                Status status = graphHelper.getStatus(edge);
+
+                if ((status == null || status == ACTIVE) &&
+                        
StringUtils.equals(getIdFromVertex(edge.getInVertex()), 
getIdFromVertex(toVertex))) {
+                    ret = edge;
+                    break;
+                }
+            }
+        }
+
+        return ret;
+    }
+
+    private Long getRelationshipVersion(AtlasRelationship relationship) {
+        Long ret = relationship != null ? relationship.getVersion() : null;
+
+        return (ret != null) ? ret : DEFAULT_RELATIONSHIP_VERSION;
+    }
+
+    private AtlasVertex getVertexFromEndPoint(AtlasObjectId endPoint) {
+        AtlasVertex ret = null;
+
+        if (StringUtils.isNotEmpty(endPoint.getGuid())) {
+            ret = AtlasGraphUtilsV2.findByGuid(endPoint.getGuid());
+        } else if (StringUtils.isNotEmpty(endPoint.getTypeName()) && 
MapUtils.isNotEmpty(endPoint.getUniqueAttributes())) {
+            AtlasEntityType entityType = 
typeRegistry.getEntityTypeByName(endPoint.getTypeName());
+
+            ret = AtlasGraphUtilsV2.findByUniqueAttributes(entityType, 
endPoint.getUniqueAttributes());
+        }
+
+        return ret;
+    }
+
+    private AtlasEdge createRelationshipEdge(AtlasVertex fromVertex, 
AtlasVertex toVertex, AtlasRelationship relationship) throws 
RepositoryException, AtlasBaseException {
+        String        relationshipLabel = getRelationshipEdgeLabel(fromVertex, 
toVertex, relationship.getTypeName());
+        PropagateTags tagPropagation    = 
getRelationshipTagPropagation(fromVertex, toVertex, relationship);
+        AtlasEdge     ret               = 
graphHelper.getOrCreateEdge(fromVertex, toVertex, relationshipLabel);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Created relationship edge from [{}] --> [{}] using edge 
label: [{}]", getTypeName(fromVertex), getTypeName(toVertex), 
relationshipLabel);
+        }
+
+        // map additional properties to relationship edge
+        if (ret != null) {
+            final String guid = UUID.randomUUID().toString();
+
+            AtlasGraphUtilsV2.setProperty(ret, 
Constants.ENTITY_TYPE_PROPERTY_KEY, relationship.getTypeName());
+            AtlasGraphUtilsV2.setProperty(ret, 
Constants.RELATIONSHIP_GUID_PROPERTY_KEY, guid);
+            AtlasGraphUtilsV2.setProperty(ret, Constants.VERSION_PROPERTY_KEY, 
getRelationshipVersion(relationship));
+            AtlasGraphUtilsV2.setProperty(ret, 
Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, tagPropagation.name());
+
+            // blocked propagated classifications
+            handleBlockedClassifications(ret, 
relationship.getBlockedPropagatedClassifications());
+
+            // propagate tags
+            deleteHandler.addTagPropagation(ret, tagPropagation);
+        }
+
+        return ret;
+    }
+
+    private PropagateTags getRelationshipTagPropagation(AtlasVertex 
fromVertex, AtlasVertex toVertex, AtlasRelationship relationship) {
+        AtlasRelationshipType   relationshipType = 
typeRegistry.getRelationshipTypeByName(relationship.getTypeName());
+        AtlasRelationshipEndDef endDef1          = 
relationshipType.getRelationshipDef().getEndDef1();
+        AtlasRelationshipEndDef endDef2          = 
relationshipType.getRelationshipDef().getEndDef2();
+        Set<String>             fromVertexTypes  = 
getTypeAndAllSuperTypes(getTypeName(fromVertex));
+        Set<String>             toVertexTypes    = 
getTypeAndAllSuperTypes(getTypeName(toVertex));
+        PropagateTags           ret              = 
relationshipType.getRelationshipDef().getPropagateTags();
+
+        // relationshipDef is defined as end1 (hive_db) and end2 (hive_table) 
and tagPropagation = ONE_TO_TWO
+        // relationship edge exists from [hive_table --> hive_db]
+        // swap the tagPropagation property for such cases.
+        if (fromVertexTypes.contains(endDef2.getType()) && 
toVertexTypes.contains(endDef1.getType())) {
+            if (ret == ONE_TO_TWO) {
+                ret = TWO_TO_ONE;
+            } else if (ret == TWO_TO_ONE) {
+                ret = ONE_TO_TWO;
+            }
+        }
+
+        return ret;
+    }
+
+    private String getRelationshipEdgeLabel(AtlasVertex fromVertex, 
AtlasVertex toVertex, String relationshipTypeName) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("getRelationshipEdgeLabel({})", relationshipTypeName);
+        }
+
+        AtlasRelationshipType   relationshipType   = 
typeRegistry.getRelationshipTypeByName(relationshipTypeName);
+        String                  ret                = 
relationshipType.getRelationshipDef().getRelationshipLabel();
+        AtlasRelationshipEndDef endDef1            = 
relationshipType.getRelationshipDef().getEndDef1();
+        AtlasRelationshipEndDef endDef2            = 
relationshipType.getRelationshipDef().getEndDef2();
+        Set<String>             fromVertexTypes    = 
getTypeAndAllSuperTypes(AtlasGraphUtilsV2.getTypeName(fromVertex));
+        Set<String>             toVertexTypes      = 
getTypeAndAllSuperTypes(AtlasGraphUtilsV2.getTypeName(toVertex));
+        AtlasAttribute          attribute          = null;
+
+        // validate entity type and all its supertypes contains 
relationshipDefs end type
+        // e.g. [hive_process -> hive_table] -> [Process -> DataSet]
+        if (fromVertexTypes.contains(endDef1.getType()) && 
toVertexTypes.contains(endDef2.getType())) {
+            String attributeName = endDef1.getName();
+
+            attribute = 
relationshipType.getEnd1Type().getRelationshipAttribute(attributeName);
+
+        } else if (fromVertexTypes.contains(endDef2.getType()) && 
toVertexTypes.contains(endDef1.getType())) {
+            String attributeName = endDef2.getName();
+
+            attribute = 
relationshipType.getEnd2Type().getRelationshipAttribute(attributeName);
+        }
+
+        if (attribute != null) {
+            ret = attribute.getRelationshipEdgeLabel();
+        }
+
+        return ret;
+    }
+
+    public Set<String> getTypeAndAllSuperTypes(String entityTypeName) {
+        AtlasEntityType entityType = 
typeRegistry.getEntityTypeByName(entityTypeName);
+
+        return (entityType != null) ? entityType.getTypeAndAllSuperTypes() : 
new HashSet<String>();
+    }
+
+    private String getTypeNameFromObjectId(AtlasObjectId objectId) {
+        String typeName = objectId.getTypeName();
+
+        if (StringUtils.isBlank(typeName)) {
+            typeName = 
AtlasGraphUtilsV2.getTypeNameFromGuid(objectId.getGuid());
+        }
+
+        return typeName;
+    }
+
+    /**
+     * Check whether this vertex has a relationship associated with this 
relationship type.
+     * @param vertex
+     * @param relationshipTypeName
+     * @return true if found an edge with this relationship type in.
+     */
+    private boolean vertexHasRelationshipWithType(AtlasVertex vertex, String 
relationshipTypeName) {
+        String relationshipEdgeLabel = 
getRelationshipEdgeLabel(getTypeName(vertex), relationshipTypeName);
+        Iterator<AtlasEdge> iter     = 
graphHelper.getAdjacentEdgesByLabel(vertex, AtlasEdgeDirection.BOTH, 
relationshipEdgeLabel);
+
+        return (iter != null) ? iter.hasNext() : false;
+    }
+
+    private String getRelationshipEdgeLabel(String typeName, String 
relationshipTypeName) {
+        AtlasRelationshipType relationshipType = 
typeRegistry.getRelationshipTypeByName(relationshipTypeName);
+        AtlasRelationshipDef  relationshipDef  = 
relationshipType.getRelationshipDef();
+        AtlasEntityType       end1Type         = 
relationshipType.getEnd1Type();
+        AtlasEntityType       end2Type         = 
relationshipType.getEnd2Type();
+        Set<String>           vertexTypes      = 
getTypeAndAllSuperTypes(typeName);
+        AtlasAttribute        attribute        = null;
+
+        if (vertexTypes.contains(end1Type.getTypeName())) {
+            String attributeName = relationshipDef.getEndDef1().getName();
+
+            attribute = (attributeName != null) ? 
end1Type.getAttribute(attributeName) : null;
+        } else if (vertexTypes.contains(end2Type.getTypeName())) {
+            String attributeName = relationshipDef.getEndDef2().getName();
+
+            attribute = (attributeName != null) ? 
end2Type.getAttribute(attributeName) : null;
+        }
+
+        return (attribute != null) ? attribute.getRelationshipEdgeLabel() : 
null;
+    }
+}
\ No newline at end of file

Reply via email to