http://git-wip-us.apache.org/repos/asf/atlas/blob/3d5b4880/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasClassificationDefStoreV2.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasClassificationDefStoreV2.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasClassificationDefStoreV2.java
new file mode 100644
index 0000000..4082fde
--- /dev/null
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasClassificationDefStoreV2.java
@@ -0,0 +1,364 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2;
+
+
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.authorize.AtlasPrivilege;
+import org.apache.atlas.authorize.AtlasAuthorizationUtils;
+import org.apache.atlas.authorize.AtlasTypeAccessRequest;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.typedef.AtlasClassificationDef;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.type.AtlasClassificationType;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.typesystem.types.DataTypes.TypeCategory;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * ClassificationDef store in v1 format.
+ */
+class AtlasClassificationDefStoreV2 extends 
AtlasAbstractDefStoreV2<AtlasClassificationDef> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AtlasClassificationDefStoreV2.class);
+
+    private static final String  TRAIT_NAME_REGEX   = "[a-zA-Z][a-zA-Z0-9_ 
.]*";
+
+    private static final Pattern TRAIT_NAME_PATTERN = 
Pattern.compile(TRAIT_NAME_REGEX);
+
+    public AtlasClassificationDefStoreV2(AtlasTypeDefGraphStoreV2 
typeDefStore, AtlasTypeRegistry typeRegistry) {
+        super(typeDefStore, typeRegistry);
+    }
+
+    @Override
+    public AtlasVertex preCreate(AtlasClassificationDef classificationDef) 
throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AtlasClassificationDefStoreV1.preCreate({})", 
classificationDef);
+        }
+
+        validateType(classificationDef);
+
+        AtlasType type = typeRegistry.getType(classificationDef.getName());
+
+        if (type.getTypeCategory() != 
org.apache.atlas.model.TypeCategory.CLASSIFICATION) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_MATCH_FAILED, 
classificationDef.getName(), TypeCategory.TRAIT.name());
+        }
+
+        AtlasVertex ret = 
typeDefStore.findTypeVertexByName(classificationDef.getName());
+
+        if (ret != null) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_ALREADY_EXISTS, 
classificationDef.getName());
+        }
+
+        ret = typeDefStore.createTypeVertex(classificationDef);
+
+        updateVertexPreCreate(classificationDef, 
(AtlasClassificationType)type, ret);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== AtlasClassificationDefStoreV1.preCreate({}): {}", 
classificationDef, ret);
+        }
+
+        return ret;
+    }
+
+    @Override
+    public AtlasClassificationDef create(AtlasClassificationDef 
classificationDef, AtlasVertex preCreateResult) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AtlasClassificationDefStoreV1.create({}, {})", 
classificationDef, preCreateResult);
+        }
+
+        AtlasAuthorizationUtils.verifyAccess(new 
AtlasTypeAccessRequest(AtlasPrivilege.TYPE_CREATE, classificationDef), "create 
classification-def ", classificationDef.getName());
+
+        AtlasVertex vertex = (preCreateResult == null) ? 
preCreate(classificationDef) : preCreateResult;
+
+        updateVertexAddReferences(classificationDef, vertex);
+
+        AtlasClassificationDef ret = toClassificationDef(vertex);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== AtlasClassificationDefStoreV1.create({}, {}): {}", 
classificationDef, preCreateResult, ret);
+        }
+
+        return ret;
+    }
+
+    @Override
+    public List<AtlasClassificationDef> getAll() throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AtlasClassificationDefStoreV1.getAll()");
+        }
+
+        List<AtlasClassificationDef> ret = new ArrayList<>();
+
+        Iterator<AtlasVertex> vertices = 
typeDefStore.findTypeVerticesByCategory(TypeCategory.TRAIT);
+        while (vertices.hasNext()) {
+            ret.add(toClassificationDef(vertices.next()));
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== AtlasClassificationDefStoreV1.getAll(): count={}", 
ret.size());
+        }
+        return ret;
+    }
+
+    @Override
+    public AtlasClassificationDef getByName(String name) throws 
AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AtlasClassificationDefStoreV1.getByName({})", name);
+        }
+
+        AtlasVertex vertex = 
typeDefStore.findTypeVertexByNameAndCategory(name, TypeCategory.TRAIT);
+
+        if (vertex == null) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, 
name);
+        }
+
+        vertex.getProperty(Constants.TYPE_CATEGORY_PROPERTY_KEY, 
TypeCategory.class);
+
+        AtlasClassificationDef ret = toClassificationDef(vertex);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== AtlasClassificationDefStoreV1.getByName({}): {}", 
name, ret);
+        }
+
+        return ret;
+    }
+
+    @Override
+    public AtlasClassificationDef getByGuid(String guid) throws 
AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AtlasClassificationDefStoreV1.getByGuid({})", guid);
+        }
+
+        AtlasVertex vertex = 
typeDefStore.findTypeVertexByGuidAndCategory(guid, TypeCategory.TRAIT);
+
+        if (vertex == null) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, 
guid);
+        }
+
+        AtlasClassificationDef ret = toClassificationDef(vertex);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== AtlasClassificationDefStoreV1.getByGuid({}): {}", 
guid, ret);
+        }
+
+        return ret;
+    }
+
+    @Override
+    public AtlasClassificationDef update(AtlasClassificationDef classifiDef) 
throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AtlasClassificationDefStoreV1.update({})", 
classifiDef);
+        }
+
+        validateType(classifiDef);
+
+        AtlasClassificationDef ret = 
StringUtils.isNotBlank(classifiDef.getGuid())
+                  ? updateByGuid(classifiDef.getGuid(), classifiDef) : 
updateByName(classifiDef.getName(), classifiDef);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== AtlasClassificationDefStoreV1.update({}): {}", 
classifiDef, ret);
+        }
+
+        return ret;
+    }
+
+    @Override
+    public AtlasClassificationDef updateByName(String name, 
AtlasClassificationDef classificationDef)
+        throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AtlasClassificationDefStoreV1.updateByName({}, 
{})", name, classificationDef);
+        }
+
+        AtlasClassificationDef existingDef   = 
typeRegistry.getClassificationDefByName(name);
+
+        AtlasAuthorizationUtils.verifyAccess(new 
AtlasTypeAccessRequest(AtlasPrivilege.TYPE_UPDATE, existingDef), "update 
classification-def ", name);
+
+        validateType(classificationDef);
+
+        AtlasType type = typeRegistry.getType(classificationDef.getName());
+
+        if (type.getTypeCategory() != 
org.apache.atlas.model.TypeCategory.CLASSIFICATION) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_MATCH_FAILED, 
classificationDef.getName(), TypeCategory.TRAIT.name());
+        }
+
+        AtlasVertex vertex = 
typeDefStore.findTypeVertexByNameAndCategory(name, TypeCategory.TRAIT);
+
+        if (vertex == null) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, 
name);
+        }
+
+        updateVertexPreUpdate(classificationDef, 
(AtlasClassificationType)type, vertex);
+        updateVertexAddReferences(classificationDef, vertex);
+
+        AtlasClassificationDef ret = toClassificationDef(vertex);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== AtlasClassificationDefStoreV1.updateByName({}, {}): 
{}", name, classificationDef, ret);
+        }
+
+        return ret;
+    }
+
+    @Override
+    public AtlasClassificationDef updateByGuid(String guid, 
AtlasClassificationDef classificationDef) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AtlasClassificationDefStoreV1.updateByGuid({})", 
guid);
+        }
+
+        AtlasClassificationDef existingDef   = 
typeRegistry.getClassificationDefByGuid(guid);
+
+        AtlasAuthorizationUtils.verifyAccess(new 
AtlasTypeAccessRequest(AtlasPrivilege.TYPE_UPDATE, existingDef), "update 
classification-def ", (existingDef != null ? existingDef.getName() : guid));
+
+        validateType(classificationDef);
+
+        AtlasType type = typeRegistry.getTypeByGuid(guid);
+
+        if (type.getTypeCategory() != 
org.apache.atlas.model.TypeCategory.CLASSIFICATION) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_MATCH_FAILED, 
classificationDef.getName(), TypeCategory.TRAIT.name());
+        }
+
+        AtlasVertex vertex = 
typeDefStore.findTypeVertexByGuidAndCategory(guid, TypeCategory.TRAIT);
+
+        if (vertex == null) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, 
guid);
+        }
+
+        updateVertexPreUpdate(classificationDef, 
(AtlasClassificationType)type, vertex);
+        updateVertexAddReferences(classificationDef, vertex);
+
+        AtlasClassificationDef ret = toClassificationDef(vertex);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== AtlasClassificationDefStoreV1.updateByGuid({}): 
{}", guid, ret);
+        }
+
+        return ret;
+    }
+
+    @Override
+    public AtlasVertex preDeleteByName(String name) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AtlasClassificationDefStoreV1.preDeleteByName({})", 
name);
+        }
+
+        AtlasClassificationDef existingDef = 
typeRegistry.getClassificationDefByName(name);
+
+        AtlasAuthorizationUtils.verifyAccess(new 
AtlasTypeAccessRequest(AtlasPrivilege.TYPE_DELETE, existingDef), "delete 
classification-def ", name);
+
+        AtlasVertex ret = typeDefStore.findTypeVertexByNameAndCategory(name, 
TypeCategory.TRAIT);
+
+        if (AtlasGraphUtilsV2.typeHasInstanceVertex(name)) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_HAS_REFERENCES, 
name);
+        }
+
+        if (ret == null) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, 
name);
+        }
+
+        typeDefStore.deleteTypeVertexOutEdges(ret);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== AtlasClassificationDefStoreV1.preDeleteByName({}): 
ret=", name, ret);
+        }
+
+        return ret;
+    }
+
+    @Override
+    public AtlasVertex preDeleteByGuid(String guid) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AtlasClassificationDefStoreV1.preDeleteByGuid({})", 
guid);
+        }
+
+        AtlasClassificationDef existingDef = 
typeRegistry.getClassificationDefByGuid(guid);
+
+        AtlasAuthorizationUtils.verifyAccess(new 
AtlasTypeAccessRequest(AtlasPrivilege.TYPE_DELETE, existingDef), "delete 
classification-def ", (existingDef != null ? existingDef.getName() : guid));
+
+        AtlasVertex ret = typeDefStore.findTypeVertexByGuidAndCategory(guid, 
TypeCategory.TRAIT);
+
+        String typeName = AtlasGraphUtilsV2.getProperty(ret, 
Constants.TYPENAME_PROPERTY_KEY, String.class);
+
+        if (AtlasGraphUtilsV2.typeHasInstanceVertex(typeName)) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_HAS_REFERENCES, 
typeName);
+        }
+
+        if (ret == null) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, 
guid);
+        }
+
+        typeDefStore.deleteTypeVertexOutEdges(ret);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== AtlasClassificationDefStoreV1.preDeleteByGuid({}): 
ret=", guid, ret);
+        }
+
+        return ret;
+    }
+
+    private void updateVertexPreCreate(AtlasClassificationDef  
classificationDef,
+                                       AtlasClassificationType 
classificationType,
+                                       AtlasVertex             vertex) throws 
AtlasBaseException {
+        AtlasStructDefStoreV2.updateVertexPreCreate(classificationDef, 
classificationType, vertex, typeDefStore);
+    }
+
+    private void updateVertexPreUpdate(AtlasClassificationDef  
classificationDef,
+                                       AtlasClassificationType 
classificationType,
+                                       AtlasVertex             vertex) throws 
AtlasBaseException {
+        AtlasStructDefStoreV2.updateVertexPreUpdate(classificationDef, 
classificationType, vertex, typeDefStore);
+    }
+
+    private void updateVertexAddReferences(AtlasClassificationDef 
classificationDef, AtlasVertex vertex) throws AtlasBaseException {
+        AtlasStructDefStoreV2.updateVertexAddReferences(classificationDef, 
vertex, typeDefStore);
+
+        typeDefStore.createSuperTypeEdges(vertex, 
classificationDef.getSuperTypes(), TypeCategory.TRAIT);
+        // create edges from this vertex to entity Type vertices with the 
supplied entity type names
+        typeDefStore.createEntityTypeEdges(vertex, 
classificationDef.getEntityTypes());
+    }
+
+    private AtlasClassificationDef toClassificationDef(AtlasVertex vertex) 
throws AtlasBaseException {
+        AtlasClassificationDef ret = null;
+
+        if (vertex != null && typeDefStore.isTypeVertex(vertex, 
TypeCategory.TRAIT)) {
+            ret = new AtlasClassificationDef();
+
+            AtlasStructDefStoreV2.toStructDef(vertex, ret, typeDefStore);
+
+            ret.setSuperTypes(typeDefStore.getSuperTypeNames(vertex));
+            ret.setEntityTypes(typeDefStore.getEntityTypeNames(vertex));
+        }
+
+        return ret;
+    }
+
+    @Override
+    public boolean isValidName(String typeName) {
+        Matcher m = TRAIT_NAME_PATTERN.matcher(typeName);
+
+        return m.matches();
+    }
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/3d5b4880/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
new file mode 100644
index 0000000..7ed99a4
--- /dev/null
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
@@ -0,0 +1,478 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2;
+
+
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.listener.EntityChangeListener;
+import org.apache.atlas.listener.EntityChangeListenerV2;
+import org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2;
+import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
+import org.apache.atlas.model.instance.AtlasClassification;
+import org.apache.atlas.model.instance.AtlasEntity;
+
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.instance.AtlasRelatedObjectId;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.instance.EntityMutations.EntityOperation;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.v1.model.instance.Referenceable;
+import org.apache.atlas.v1.model.instance.Struct;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.converters.AtlasInstanceConverter;
+import org.apache.atlas.repository.graph.FullTextMapperV2;
+import org.apache.atlas.repository.graph.GraphHelper;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.util.AtlasRepositoryConfiguration;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import static 
org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_ADD;
+import static 
org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_DELETE;
+import static 
org.apache.atlas.util.AtlasRepositoryConfiguration.isV2EntityNotificationEnabled;
+
+
+@Component
+public class AtlasEntityChangeNotifier {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AtlasEntityChangeNotifier.class);
+
+    private final Set<EntityChangeListener>   entityChangeListeners;
+    private final Set<EntityChangeListenerV2> entityChangeListenersV2;
+    private final AtlasInstanceConverter      instanceConverter;
+    private final FullTextMapperV2            fullTextMapperV2;
+    private final AtlasTypeRegistry           atlasTypeRegistry;
+
+
+    @Inject
+    public AtlasEntityChangeNotifier(Set<EntityChangeListener> 
entityChangeListeners,
+                                     Set<EntityChangeListenerV2> 
entityChangeListenersV2,
+                                     AtlasInstanceConverter instanceConverter,
+                                     FullTextMapperV2 fullTextMapperV2,
+                                     AtlasTypeRegistry atlasTypeRegistry) {
+        this.entityChangeListeners   = entityChangeListeners;
+        this.entityChangeListenersV2 = entityChangeListenersV2;
+        this.instanceConverter       = instanceConverter;
+        this.fullTextMapperV2 = fullTextMapperV2;
+        this.atlasTypeRegistry = atlasTypeRegistry;
+    }
+
+    public void onEntitiesMutated(EntityMutationResponse 
entityMutationResponse, boolean isImport) throws AtlasBaseException {
+        if (CollectionUtils.isEmpty(entityChangeListeners) || 
instanceConverter == null) {
+            return;
+        }
+
+        List<AtlasEntityHeader> createdEntities          = 
entityMutationResponse.getCreatedEntities();
+        List<AtlasEntityHeader> updatedEntities          = 
entityMutationResponse.getUpdatedEntities();
+        List<AtlasEntityHeader> partiallyUpdatedEntities = 
entityMutationResponse.getPartialUpdatedEntities();
+        List<AtlasEntityHeader> deletedEntities          = 
entityMutationResponse.getDeletedEntities();
+
+        // complete full text mapping before calling toReferenceables(), from 
notifyListners(), to
+        // include all vertex updates in the current graph-transaction
+        doFullTextMapping(createdEntities);
+        doFullTextMapping(updatedEntities);
+        doFullTextMapping(partiallyUpdatedEntities);
+
+        notifyListeners(createdEntities, EntityOperation.CREATE, isImport);
+        notifyListeners(updatedEntities, EntityOperation.UPDATE, isImport);
+        notifyListeners(partiallyUpdatedEntities, 
EntityOperation.PARTIAL_UPDATE, isImport);
+        notifyListeners(deletedEntities, EntityOperation.DELETE, isImport);
+
+        notifyPropagatedEntities();
+    }
+
+    public void onClassificationAddedToEntity(AtlasEntity entity, 
List<AtlasClassification> addedClassifications) throws AtlasBaseException {
+        if (isV2EntityNotificationEnabled()) {
+            doFullTextMapping(entity.getGuid());
+
+            for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
+                listener.onClassificationsAdded(entity, addedClassifications);
+            }
+        } else {
+            updateFullTextMapping(entity.getGuid(), addedClassifications);
+
+            Referenceable entityRef = toReferenceable(entity.getGuid());
+            List<Struct>  traits    = toStruct(addedClassifications);
+
+            if (entity == null || CollectionUtils.isEmpty(traits)) {
+                return;
+            }
+
+            for (EntityChangeListener listener : entityChangeListeners) {
+                try {
+                    listener.onTraitsAdded(entityRef, traits);
+                } catch (AtlasException e) {
+                    throw new 
AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, 
getListenerName(listener), "TraitAdd");
+                }
+            }
+        }
+    }
+
+    public void onClassificationUpdatedToEntity(AtlasEntity entity, 
List<AtlasClassification> updatedClassifications) throws AtlasBaseException {
+        if (isV2EntityNotificationEnabled()) {
+            doFullTextMapping(entity.getGuid());
+
+            for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
+                listener.onClassificationsUpdated(entity, 
updatedClassifications);
+            }
+        } else {
+            doFullTextMapping(entity.getGuid());
+
+            Referenceable entityRef = toReferenceable(entity.getGuid());
+            List<Struct>  traits    = toStruct(updatedClassifications);
+
+            if (entityRef == null || CollectionUtils.isEmpty(traits)) {
+                return;
+            }
+
+            for (EntityChangeListener listener : entityChangeListeners) {
+                try {
+                    listener.onTraitsUpdated(entityRef, traits);
+                } catch (AtlasException e) {
+                    throw new 
AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, 
getListenerName(listener), "TraitUpdate");
+                }
+            }
+        }
+    }
+
+    public void onClassificationDeletedFromEntity(AtlasEntity entity, 
List<AtlasClassification> deletedClassifications) throws AtlasBaseException {
+        if (isV2EntityNotificationEnabled()) {
+            doFullTextMapping(entity.getGuid());
+
+            for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
+                listener.onClassificationsDeleted(entity, 
deletedClassifications);
+            }
+        } else {
+            doFullTextMapping(entity.getGuid());
+
+            Referenceable entityRef = toReferenceable(entity.getGuid());
+            List<Struct>  traits    = toStruct(deletedClassifications);
+
+            if (entityRef == null || 
CollectionUtils.isEmpty(deletedClassifications)) {
+                return;
+            }
+
+            for (EntityChangeListener listener : entityChangeListeners) {
+                try {
+                    listener.onTraitsDeleted(entityRef, traits);
+                } catch (AtlasException e) {
+                    throw new 
AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, 
getListenerName(listener), "TraitDelete");
+                }
+            }
+
+        }
+    }
+
+    public void onTermAddedToEntities(AtlasGlossaryTerm term, 
List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException {
+        // listeners notified on term-entity association only if v2 
notifications are enabled
+        if (isV2EntityNotificationEnabled()) {
+            for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
+                listener.onTermAdded(term, entityIds);
+            }
+        } else {
+            List<Referenceable> entityRefs = toReferenceables(entityIds);
+
+            for (EntityChangeListener listener : entityChangeListeners) {
+                try {
+                    listener.onTermAdded(entityRefs, term);
+                } catch (AtlasException e) {
+                    throw new 
AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, 
getListenerName(listener), "TermAdd");
+                }
+            }
+        }
+    }
+
+    public void onTermDeletedFromEntities(AtlasGlossaryTerm term, 
List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException {
+        // listeners notified on term-entity disassociation only if v2 
notifications are enabled
+        if (isV2EntityNotificationEnabled()) {
+            for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
+                listener.onTermDeleted(term, entityIds);
+            }
+        } else {
+            List<Referenceable> entityRefs = toReferenceables(entityIds);
+
+            for (EntityChangeListener listener : entityChangeListeners) {
+                try {
+                    listener.onTermDeleted(entityRefs, term);
+                } catch (AtlasException e) {
+                    throw new 
AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, 
getListenerName(listener), "TermDelete");
+                }
+            }
+        }
+    }
+
+    public void notifyPropagatedEntities() throws AtlasBaseException {
+        RequestContext                         context             = 
RequestContext.get();
+        Map<String, List<AtlasClassification>> addedPropagations   = 
context.getAddedPropagations();
+        Map<String, List<AtlasClassification>> removedPropagations = 
context.getRemovedPropagations();
+
+        notifyPropagatedEntities(addedPropagations, 
PROPAGATED_CLASSIFICATION_ADD);
+        notifyPropagatedEntities(removedPropagations, 
PROPAGATED_CLASSIFICATION_DELETE);
+    }
+
+    private void notifyPropagatedEntities(Map<String, 
List<AtlasClassification>> entityPropagationMap, EntityAuditActionV2 action) 
throws AtlasBaseException {
+        if (MapUtils.isEmpty(entityPropagationMap) || action == null) {
+            return;
+        }
+
+        for (String guid : entityPropagationMap.keySet()) {
+            AtlasEntityWithExtInfo entityWithExtInfo = 
instanceConverter.getAndCacheEntity(guid);
+            AtlasEntity            entity            = entityWithExtInfo != 
null ? entityWithExtInfo.getEntity() : null;
+
+            if (entity == null) {
+                continue;
+            }
+
+            if (action == PROPAGATED_CLASSIFICATION_ADD) {
+                onClassificationAddedToEntity(entity, 
entityPropagationMap.get(guid));
+            } else if (action == PROPAGATED_CLASSIFICATION_DELETE) {
+                onClassificationDeletedFromEntity(entity, 
entityPropagationMap.get(guid));
+            }
+        }
+    }
+
+    private String getListenerName(EntityChangeListener listener) {
+        return listener.getClass().getSimpleName();
+    }
+
+    private void notifyListeners(List<AtlasEntityHeader> entityHeaders, 
EntityOperation operation, boolean isImport) throws AtlasBaseException {
+        if (CollectionUtils.isEmpty(entityHeaders)) {
+            return;
+        }
+
+        if (isV2EntityNotificationEnabled()) {
+            notifyV2Listeners(entityHeaders, operation, isImport);
+        } else {
+            notifyV1Listeners(entityHeaders, operation, isImport);
+        }
+    }
+
+    private void notifyV1Listeners(List<AtlasEntityHeader> entityHeaders, 
EntityOperation operation, boolean isImport) throws AtlasBaseException {
+        List<Referenceable> typedRefInsts = toReferenceables(entityHeaders, 
operation);
+
+        for (EntityChangeListener listener : entityChangeListeners) {
+            try {
+                switch (operation) {
+                    case CREATE:
+                        listener.onEntitiesAdded(typedRefInsts, isImport);
+                        break;
+                    case UPDATE:
+                    case PARTIAL_UPDATE:
+                        listener.onEntitiesUpdated(typedRefInsts, isImport);
+                        break;
+                    case DELETE:
+                        listener.onEntitiesDeleted(typedRefInsts, isImport);
+                        break;
+                }
+            } catch (AtlasException e) {
+                throw new 
AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, 
getListenerName(listener), operation.toString());
+            }
+        }
+    }
+
+    private void notifyV2Listeners(List<AtlasEntityHeader> entityHeaders, 
EntityOperation operation, boolean isImport) throws AtlasBaseException {
+        List<AtlasEntity> entities = toAtlasEntities(entityHeaders);
+
+        for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
+            switch (operation) {
+                case CREATE:
+                    listener.onEntitiesAdded(entities, isImport);
+                    break;
+                case UPDATE:
+                case PARTIAL_UPDATE:
+                    listener.onEntitiesUpdated(entities, isImport);
+                    break;
+                case DELETE:
+                    listener.onEntitiesDeleted(entities, isImport);
+                    break;
+            }
+        }
+    }
+
+        private List<Referenceable> toReferenceables(List<AtlasEntityHeader> 
entityHeaders, EntityOperation operation) throws AtlasBaseException {
+        List<Referenceable> ret = new ArrayList<>(entityHeaders.size());
+
+        // delete notifications don't need all attributes. Hence the special 
handling for delete operation
+        if (operation == EntityOperation.DELETE) {
+            for (AtlasEntityHeader entityHeader : entityHeaders) {
+                ret.add(new Referenceable(entityHeader.getGuid(), 
entityHeader.getTypeName(), entityHeader.getAttributes()));
+            }
+        } else {
+            for (AtlasEntityHeader entityHeader : entityHeaders) {
+                ret.add(toReferenceable(entityHeader.getGuid()));
+            }
+        }
+
+        return ret;
+    }
+
+    private List<Referenceable> toReferenceables(List<AtlasRelatedObjectId> 
entityIds) throws AtlasBaseException {
+        List<Referenceable> ret = new ArrayList<>();
+
+        if (CollectionUtils.isNotEmpty(entityIds)) {
+            for (AtlasRelatedObjectId relatedObjectId : entityIds) {
+                String entityGuid = relatedObjectId.getGuid();
+
+                ret.add(toReferenceable(entityGuid));
+            }
+        }
+
+        return ret;
+    }
+
+    private Referenceable toReferenceable(String entityId) throws 
AtlasBaseException {
+        Referenceable ret = null;
+
+        if (StringUtils.isNotEmpty(entityId)) {
+            ret = instanceConverter.getReferenceable(entityId);
+        }
+
+        return ret;
+    }
+
+        private List<Struct> toStruct(List<AtlasClassification> 
classifications) throws AtlasBaseException {
+        List<Struct> ret = null;
+
+        if (classifications != null) {
+            ret = new ArrayList<>(classifications.size());
+
+            for (AtlasClassification classification : classifications) {
+                if (classification != null) {
+                    ret.add(instanceConverter.getTrait(classification));
+                }
+            }
+        }
+
+        return ret;
+    }
+
+    private List<AtlasEntity> toAtlasEntities(List<AtlasEntityHeader> 
entityHeaders) throws AtlasBaseException {
+        List<AtlasEntity> ret = new ArrayList<>();
+
+        if (CollectionUtils.isNotEmpty(entityHeaders)) {
+            for (AtlasEntityHeader entityHeader : entityHeaders) {
+                String                 entityGuid        = 
entityHeader.getGuid();
+                String                 typeName          = 
entityHeader.getTypeName();
+
+                // Skip all internal types as the HARD DELETE will cause 
lookup errors
+                AtlasEntityType entityType = 
atlasTypeRegistry.getEntityTypeByName(typeName);
+                if (Objects.nonNull(entityType) && 
entityType.isInternalType()) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Skipping internal type = {}", typeName);
+                    }
+                    continue;
+                }
+
+                AtlasEntityWithExtInfo entityWithExtInfo = 
instanceConverter.getAndCacheEntity(entityGuid);
+
+                if (entityWithExtInfo != null) {
+                    ret.add(entityWithExtInfo.getEntity());
+                }
+            }
+        }
+
+        return ret;
+    }
+
+    private void doFullTextMapping(List<AtlasEntityHeader> entityHeaders) {
+        if (CollectionUtils.isEmpty(entityHeaders)) {
+            return;
+        }
+
+        try {
+            if(!AtlasRepositoryConfiguration.isFullTextSearchEnabled()) {
+                return;
+            }
+        } catch (AtlasException e) {
+            LOG.warn("Unable to determine if FullText is disabled. Proceeding 
with FullText mapping");
+        }
+
+        for (AtlasEntityHeader entityHeader : entityHeaders) {
+            if(GraphHelper.isInternalType(entityHeader.getTypeName())) {
+                continue;
+            }
+
+            String      guid   = entityHeader.getGuid();
+            AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(guid);
+
+            if(vertex == null) {
+                continue;
+            }
+
+            try {
+                String fullText = fullTextMapperV2.getIndexTextForEntity(guid);
+
+                GraphHelper.setProperty(vertex, 
Constants.ENTITY_TEXT_PROPERTY_KEY, fullText);
+            } catch (AtlasBaseException e) {
+                LOG.error("FullText mapping failed for Vertex[ guid = {} ]", 
guid, e);
+            }
+        }
+    }
+
+    private void updateFullTextMapping(String entityId, 
List<AtlasClassification> classifications) {
+        try {
+            if(!AtlasRepositoryConfiguration.isFullTextSearchEnabled()) {
+                return;
+            }
+        } catch (AtlasException e) {
+            LOG.warn("Unable to determine if FullText is disabled. Proceeding 
with FullText mapping");
+        }
+
+        if (StringUtils.isEmpty(entityId) || 
CollectionUtils.isEmpty(classifications)) {
+            return;
+        }
+
+        AtlasVertex atlasVertex = AtlasGraphUtilsV2.findByGuid(entityId);
+        if(atlasVertex == null || GraphHelper.isInternalType(atlasVertex)) {
+            return;
+        }
+
+        try {
+            String classificationFullText = 
fullTextMapperV2.getIndexTextForClassifications(entityId, classifications);
+            String existingFullText = (String) 
GraphHelper.getProperty(atlasVertex, Constants.ENTITY_TEXT_PROPERTY_KEY);
+
+            String newFullText = existingFullText + " " + 
classificationFullText;
+            GraphHelper.setProperty(atlasVertex, 
Constants.ENTITY_TEXT_PROPERTY_KEY, newFullText);
+        } catch (AtlasBaseException e) {
+            LOG.error("FullText mapping failed for Vertex[ guid = {} ]", 
entityId, e);
+        }
+    }
+
+    private void doFullTextMapping(String guid) {
+        AtlasEntityHeader entityHeader = new AtlasEntityHeader();
+        entityHeader.setGuid(guid);
+
+        doFullTextMapping(Collections.singletonList(entityHeader));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/atlas/blob/3d5b4880/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityDefStoreV2.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityDefStoreV2.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityDefStoreV2.java
new file mode 100644
index 0000000..f427136
--- /dev/null
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityDefStoreV2.java
@@ -0,0 +1,358 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2;
+
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.authorize.AtlasPrivilege;
+import org.apache.atlas.authorize.AtlasTypeAccessRequest;
+import org.apache.atlas.authorize.AtlasAuthorizationUtils;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.typesystem.types.DataTypes.TypeCategory;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * EntityDef store in v1 format.
+ */
+public class AtlasEntityDefStoreV2 extends 
AtlasAbstractDefStoreV2<AtlasEntityDef> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AtlasEntityDefStoreV2.class);
+
+    @Inject
+    public AtlasEntityDefStoreV2(AtlasTypeDefGraphStoreV2 typeDefStore, 
AtlasTypeRegistry typeRegistry) {
+        super(typeDefStore, typeRegistry);
+    }
+
+    @Override
+    public AtlasVertex preCreate(AtlasEntityDef entityDef) throws 
AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AtlasEntityDefStoreV1.preCreate({})", entityDef);
+        }
+
+        validateType(entityDef);
+
+        AtlasType type = typeRegistry.getType(entityDef.getName());
+
+        if (type.getTypeCategory() != 
org.apache.atlas.model.TypeCategory.ENTITY) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_MATCH_FAILED, 
entityDef.getName(), TypeCategory.CLASS.name());
+        }
+
+
+
+        AtlasVertex ret = 
typeDefStore.findTypeVertexByName(entityDef.getName());
+
+        if (ret != null) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_ALREADY_EXISTS, 
entityDef.getName());
+        }
+
+        ret = typeDefStore.createTypeVertex(entityDef);
+
+        updateVertexPreCreate(entityDef, (AtlasEntityType)type, ret);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== AtlasEntityDefStoreV1.preCreate({}): {}", 
entityDef, ret);
+        }
+
+        return ret;
+    }
+
+    @Override
+    public AtlasEntityDef create(AtlasEntityDef entityDef, AtlasVertex 
preCreateResult) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AtlasEntityDefStoreV1.create({}, {})", entityDef, 
preCreateResult);
+        }
+
+        AtlasAuthorizationUtils.verifyAccess(new 
AtlasTypeAccessRequest(AtlasPrivilege.TYPE_CREATE, entityDef), "create 
entity-def ", entityDef.getName());
+
+        AtlasVertex vertex = (preCreateResult == null) ? preCreate(entityDef) 
: preCreateResult;
+
+        updateVertexAddReferences(entityDef, vertex);
+
+        AtlasEntityDef ret = toEntityDef(vertex);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== AtlasEntityDefStoreV1.create({}, {}): {}", 
entityDef, preCreateResult, ret);
+        }
+
+        return ret;
+    }
+
+    @Override
+    public List<AtlasEntityDef> getAll() throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AtlasEntityDefStoreV1.getAll()");
+        }
+
+        List<AtlasEntityDef>  ret      = new ArrayList<>();
+        Iterator<AtlasVertex> vertices = 
typeDefStore.findTypeVerticesByCategory(TypeCategory.CLASS);
+
+        while (vertices.hasNext()) {
+            ret.add(toEntityDef(vertices.next()));
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== AtlasEntityDefStoreV1.getAll(): count={}", 
ret.size());
+        }
+
+        return ret;
+    }
+
+    @Override
+    public AtlasEntityDef getByName(String name) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AtlasEntityDefStoreV1.getByName({})", name);
+        }
+
+        AtlasVertex vertex = 
typeDefStore.findTypeVertexByNameAndCategory(name, TypeCategory.CLASS);
+
+        if (vertex == null) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, 
name);
+        }
+
+        vertex.getProperty(Constants.TYPE_CATEGORY_PROPERTY_KEY, 
TypeCategory.class);
+
+        AtlasEntityDef ret = toEntityDef(vertex);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== AtlasEntityDefStoreV1.getByName({}): {}", name, 
ret);
+        }
+
+        return ret;
+    }
+
+    @Override
+    public AtlasEntityDef getByGuid(String guid) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AtlasEntityDefStoreV1.getByGuid({})", guid);
+        }
+
+        AtlasVertex vertex = 
typeDefStore.findTypeVertexByGuidAndCategory(guid, TypeCategory.CLASS);
+
+        if (vertex == null) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, 
guid);
+        }
+
+        AtlasEntityDef ret = toEntityDef(vertex);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== AtlasEntityDefStoreV1.getByGuid({}): {}", guid, 
ret);
+        }
+
+        return ret;
+    }
+
+    @Override
+    public AtlasEntityDef update(AtlasEntityDef entityDef) throws 
AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AtlasEntityDefStoreV1.update({})", entityDef);
+        }
+
+        validateType(entityDef);
+
+        AtlasEntityDef ret = StringUtils.isNotBlank(entityDef.getGuid()) ? 
updateByGuid(entityDef.getGuid(), entityDef)
+                                                                         : 
updateByName(entityDef.getName(), entityDef);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== AtlasEntityDefStoreV1.update({}): {}", entityDef, 
ret);
+        }
+
+        return ret;
+    }
+
+    @Override
+    public AtlasEntityDef updateByName(String name, AtlasEntityDef entityDef) 
throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AtlasEntityDefStoreV1.updateByName({}, {})", name, 
entityDef);
+        }
+
+        AtlasEntityDef existingDef = typeRegistry.getEntityDefByName(name);
+
+        AtlasAuthorizationUtils.verifyAccess(new 
AtlasTypeAccessRequest(AtlasPrivilege.TYPE_UPDATE, existingDef), "update 
entity-def ", name);
+
+        validateType(entityDef);
+
+        AtlasType type = typeRegistry.getType(entityDef.getName());
+
+        if (type.getTypeCategory() != 
org.apache.atlas.model.TypeCategory.ENTITY) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_MATCH_FAILED, 
entityDef.getName(), TypeCategory.CLASS.name());
+        }
+
+        AtlasVertex vertex = 
typeDefStore.findTypeVertexByNameAndCategory(name, TypeCategory.CLASS);
+
+        if (vertex == null) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, 
name);
+        }
+
+        updateVertexPreUpdate(entityDef, (AtlasEntityType)type, vertex);
+        updateVertexAddReferences(entityDef, vertex);
+
+        AtlasEntityDef ret = toEntityDef(vertex);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== AtlasEntityDefStoreV1.updateByName({}, {}): {}", 
name, entityDef, ret);
+        }
+
+        return ret;
+    }
+
+    @Override
+    public AtlasEntityDef updateByGuid(String guid, AtlasEntityDef entityDef) 
throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AtlasEntityDefStoreV1.updateByGuid({})", guid);
+        }
+
+        AtlasEntityDef existingDef = typeRegistry.getEntityDefByGuid(guid);
+
+        AtlasAuthorizationUtils.verifyAccess(new 
AtlasTypeAccessRequest(AtlasPrivilege.TYPE_UPDATE, existingDef), "update 
entity-def ", (existingDef != null ? existingDef.getName() : guid));
+
+        validateType(entityDef);
+
+        AtlasType type = typeRegistry.getTypeByGuid(guid);
+
+        if (type.getTypeCategory() != 
org.apache.atlas.model.TypeCategory.ENTITY) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_MATCH_FAILED, 
entityDef.getName(), TypeCategory.CLASS.name());
+        }
+
+        AtlasVertex vertex = 
typeDefStore.findTypeVertexByGuidAndCategory(guid, TypeCategory.CLASS);
+
+        if (vertex == null) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, 
guid);
+        }
+
+        updateVertexPreUpdate(entityDef, (AtlasEntityType)type, vertex);
+        updateVertexAddReferences(entityDef, vertex);
+
+        AtlasEntityDef ret = toEntityDef(vertex);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== AtlasEntityDefStoreV1.updateByGuid({}): {}", guid, 
ret);
+        }
+
+        return ret;
+    }
+
+    @Override
+    public AtlasVertex preDeleteByName(String name) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AtlasEntityDefStoreV1.preDeleteByName({})", name);
+        }
+
+        AtlasEntityDef existingDef = typeRegistry.getEntityDefByName(name);
+
+        AtlasAuthorizationUtils.verifyAccess(new 
AtlasTypeAccessRequest(AtlasPrivilege.TYPE_DELETE, existingDef), "delete 
entity-def ", name);
+
+        AtlasVertex ret = typeDefStore.findTypeVertexByNameAndCategory(name, 
TypeCategory.CLASS);
+
+        if (AtlasGraphUtilsV2.typeHasInstanceVertex(name)) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_HAS_REFERENCES, 
name);
+        }
+
+        if (ret == null) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, 
name);
+        }
+
+        // error if we are trying to delete an entityDef that has a 
relationshipDef
+        if (typeDefStore.hasIncomingEdgesWithLabel(ret, 
AtlasGraphUtilsV2.RELATIONSHIPTYPE_EDGE_LABEL)){
+            throw new 
AtlasBaseException(AtlasErrorCode.TYPE_HAS_RELATIONSHIPS, name);
+        }
+
+        typeDefStore.deleteTypeVertexOutEdges(ret);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== AtlasEntityDefStoreV1.preDeleteByName({}): {}", 
name, ret);
+        }
+
+        return ret;
+    }
+
+    @Override
+    public AtlasVertex preDeleteByGuid(String guid) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AtlasEntityDefStoreV1.preDeleteByGuid({})", guid);
+        }
+
+        AtlasEntityDef existingDef = typeRegistry.getEntityDefByGuid(guid);
+
+        AtlasAuthorizationUtils.verifyAccess(new 
AtlasTypeAccessRequest(AtlasPrivilege.TYPE_DELETE, existingDef), "delete 
entity-def ", (existingDef != null ? existingDef.getName() : guid));
+
+        AtlasVertex ret = typeDefStore.findTypeVertexByGuidAndCategory(guid, 
TypeCategory.CLASS);
+
+        String typeName = AtlasGraphUtilsV2.getProperty(ret, 
Constants.TYPENAME_PROPERTY_KEY, String.class);
+
+        if (AtlasGraphUtilsV2.typeHasInstanceVertex(typeName)) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_HAS_REFERENCES, 
typeName);
+        }
+
+        if (ret == null) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, 
guid);
+        }
+
+        // error if we are trying to delete an entityDef that has a 
relationshipDef
+        if (typeDefStore.hasIncomingEdgesWithLabel(ret, 
AtlasGraphUtilsV2.RELATIONSHIPTYPE_EDGE_LABEL)){
+            throw new 
AtlasBaseException(AtlasErrorCode.TYPE_HAS_RELATIONSHIPS, typeName);
+        }
+
+        typeDefStore.deleteTypeVertexOutEdges(ret);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== AtlasEntityDefStoreV1.preDeleteByGuid({}): {}", 
guid, ret);
+        }
+
+        return ret;
+    }
+
+    private void updateVertexPreCreate(AtlasEntityDef entityDef, 
AtlasEntityType entityType, AtlasVertex vertex) throws AtlasBaseException {
+        AtlasStructDefStoreV2.updateVertexPreCreate(entityDef, entityType, 
vertex, typeDefStore);
+    }
+
+    private void updateVertexPreUpdate(AtlasEntityDef entityDef, 
AtlasEntityType entityType, AtlasVertex vertex)
+        throws AtlasBaseException {
+        AtlasStructDefStoreV2.updateVertexPreUpdate(entityDef, entityType, 
vertex, typeDefStore);
+    }
+
+    private void updateVertexAddReferences(AtlasEntityDef  entityDef, 
AtlasVertex vertex) throws AtlasBaseException {
+        AtlasStructDefStoreV2.updateVertexAddReferences(entityDef, vertex, 
typeDefStore);
+
+        typeDefStore.createSuperTypeEdges(vertex, entityDef.getSuperTypes(), 
TypeCategory.CLASS);
+    }
+
+    private AtlasEntityDef toEntityDef(AtlasVertex vertex) throws 
AtlasBaseException {
+        AtlasEntityDef ret = null;
+
+        if (vertex != null && typeDefStore.isTypeVertex(vertex, 
TypeCategory.CLASS)) {
+            ret = new AtlasEntityDef();
+
+            AtlasStructDefStoreV2.toStructDef(vertex, ret, typeDefStore);
+
+            ret.setSuperTypes(typeDefStore.getSuperTypeNames(vertex));
+        }
+
+        return ret;
+    }
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/3d5b4880/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityGraphDiscoveryV2.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityGraphDiscoveryV2.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityGraphDiscoveryV2.java
new file mode 100644
index 0000000..6580bee
--- /dev/null
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityGraphDiscoveryV2.java
@@ -0,0 +1,377 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2;
+
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.TypeCategory;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.AtlasStruct;
+import org.apache.atlas.repository.store.graph.EntityGraphDiscovery;
+import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext;
+import org.apache.atlas.repository.store.graph.EntityResolver;
+import org.apache.atlas.type.AtlasArrayType;
+import org.apache.atlas.type.AtlasBuiltInTypes.AtlasObjectIdType;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasMapType;
+import org.apache.atlas.type.AtlasStructType;
+import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.type.AtlasTypeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+
+public class AtlasEntityGraphDiscoveryV2 implements EntityGraphDiscovery {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AtlasEntityGraphDiscoveryV2.class);
+
+    private final AtlasTypeRegistry           typeRegistry;
+    private final EntityGraphDiscoveryContext discoveryContext;
+
+    public AtlasEntityGraphDiscoveryV2(AtlasTypeRegistry typeRegistry, 
EntityStream entityStream) {
+        this.typeRegistry     = typeRegistry;
+        this.discoveryContext = new EntityGraphDiscoveryContext(typeRegistry, 
entityStream);
+    }
+
+    @Override
+    public void init() throws AtlasBaseException {
+        //Nothing to do
+    }
+
+    @Override
+    public EntityGraphDiscoveryContext discoverEntities() throws 
AtlasBaseException {
+        // walk through entities in stream and validate them; record entity 
references
+        discover();
+
+        // resolve entity references discovered in previous step
+        resolveReferences();
+
+        return discoveryContext;
+    }
+
+    @Override
+    public void validateAndNormalize(AtlasEntity entity) throws 
AtlasBaseException {
+        List<String> messages = new ArrayList<>();
+
+        if (! AtlasTypeUtil.isValidGuid(entity.getGuid())) {
+            throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, 
"invalid guid " + entity.getGuid());
+        }
+
+        AtlasEntityType type = 
typeRegistry.getEntityTypeByName(entity.getTypeName());
+
+        if (type == null) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, 
TypeCategory.ENTITY.name(), entity.getTypeName());
+        }
+
+        type.validateValue(entity, entity.getTypeName(), messages);
+
+        if (!messages.isEmpty()) {
+            throw new 
AtlasBaseException(AtlasErrorCode.INSTANCE_CRUD_INVALID_PARAMS, messages);
+        }
+
+        type.getNormalizedValue(entity);
+    }
+
+    @Override
+    public void validateAndNormalizeForUpdate(AtlasEntity entity) throws 
AtlasBaseException {
+        List<String> messages = new ArrayList<>();
+
+        if (! AtlasTypeUtil.isValidGuid(entity.getGuid())) {
+            throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, 
"invalid guid " + entity.getGuid());
+        }
+
+        AtlasEntityType type = 
typeRegistry.getEntityTypeByName(entity.getTypeName());
+
+        if (type == null) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, 
TypeCategory.ENTITY.name(), entity.getTypeName());
+        }
+
+        type.validateValueForUpdate(entity, entity.getTypeName(), messages);
+
+        if (!messages.isEmpty()) {
+            throw new 
AtlasBaseException(AtlasErrorCode.INSTANCE_CRUD_INVALID_PARAMS, messages);
+        }
+
+        type.getNormalizedValueForUpdate(entity);
+    }
+
+    @Override
+    public void cleanUp() throws AtlasBaseException {
+        discoveryContext.cleanUp();
+    }
+
+
+    protected void discover() throws AtlasBaseException {
+        EntityStream entityStream = discoveryContext.getEntityStream();
+
+        Set<String> walkedEntities = new HashSet<>();
+
+        // walk through top-level entities and find entity references
+        while (entityStream.hasNext()) {
+            AtlasEntity entity = entityStream.next();
+
+            if (entity == null) {
+                throw new 
AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "found null entity");
+            }
+
+            walkEntityGraph(entity);
+
+            walkedEntities.add(entity.getGuid());
+        }
+
+        // walk through entities referenced by other entities
+        // referencedGuids will be updated within this for() loop; avoid use 
of iterators
+        List<String> referencedGuids = discoveryContext.getReferencedGuids();
+        for (int i = 0; i < referencedGuids.size(); i++) {
+            String guid = referencedGuids.get(i);
+
+            if (walkedEntities.contains(guid)) {
+                continue;
+            }
+
+            AtlasEntity entity = entityStream.getByGuid(guid);
+
+            if (entity != null) {
+                walkEntityGraph(entity);
+
+                walkedEntities.add(entity.getGuid());
+            }
+        }
+    }
+
+    protected void resolveReferences() throws AtlasBaseException {
+        EntityResolver[] entityResolvers = new EntityResolver[] { new 
IDBasedEntityResolver(typeRegistry),
+                                                                  new 
UniqAttrBasedEntityResolver(typeRegistry)
+                                                                };
+
+        for (EntityResolver resolver : entityResolvers) {
+            resolver.resolveEntityReferences(discoveryContext);
+        }
+    }
+
+    private void visitReference(AtlasObjectIdType type, Object val) throws 
AtlasBaseException {
+        if (type == null || val == null) {
+            return;
+        }
+
+        if (val instanceof AtlasObjectId) {
+            AtlasObjectId objId = (AtlasObjectId)val;
+
+            if (!AtlasTypeUtil.isValid(objId)) {
+                throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, 
objId.toString());
+            }
+
+            recordObjectReference(objId);
+        } else if (val instanceof Map) {
+            AtlasObjectId objId = new AtlasObjectId((Map)val);
+
+            if (!AtlasTypeUtil.isValid(objId)) {
+                throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, 
objId.toString());
+            }
+
+            recordObjectReference(objId);
+        } else {
+            throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, 
val.toString());
+        }
+    }
+
+    void visitAttribute(AtlasType attrType, Object val) throws 
AtlasBaseException {
+        if (attrType == null || val == null) {
+            return;
+        }
+
+        switch (attrType.getTypeCategory()) {
+            case PRIMITIVE:
+            case ENUM:
+                return;
+
+            case ARRAY: {
+                AtlasArrayType arrayType = (AtlasArrayType) attrType;
+                AtlasType      elemType  = arrayType.getElementType();
+
+                visitCollectionReferences(elemType, val);
+            }
+            break;
+
+            case MAP: {
+                AtlasType keyType   = ((AtlasMapType) attrType).getKeyType();
+                AtlasType valueType = ((AtlasMapType) attrType).getValueType();
+
+                visitMapReferences(keyType, valueType, val);
+            }
+            break;
+
+            case STRUCT:
+                visitStruct((AtlasStructType)attrType, val);
+            break;
+
+            case OBJECT_ID_TYPE:
+                visitReference((AtlasObjectIdType) attrType,  val);
+            break;
+
+            default:
+                throw new 
AtlasBaseException(AtlasErrorCode.TYPE_CATEGORY_INVALID, 
attrType.getTypeCategory().name());
+        }
+    }
+
+    void visitMapReferences(AtlasType keyType, AtlasType valueType, Object 
val) throws AtlasBaseException {
+        if (keyType == null || valueType == null || val == null) {
+            return;
+        }
+
+        if (isPrimitive(keyType.getTypeCategory()) && 
isPrimitive(valueType.getTypeCategory())) {
+            return;
+        }
+
+        if (Map.class.isAssignableFrom(val.getClass())) {
+            Iterator<Map.Entry> it = ((Map) val).entrySet().iterator();
+            while (it.hasNext()) {
+                Map.Entry e = it.next();
+                visitAttribute(keyType, e.getKey());
+                visitAttribute(valueType, e.getValue());
+            }
+        }
+    }
+
+    void visitCollectionReferences(AtlasType elemType, Object val) throws 
AtlasBaseException {
+        if (elemType == null || val == null || 
isPrimitive(elemType.getTypeCategory())) {
+            return;
+        }
+
+        Iterator it = null;
+
+        if (val instanceof Collection) {
+            it = ((Collection) val).iterator();
+        } else if (val instanceof Iterable) {
+            it = ((Iterable) val).iterator();
+        } else if (val instanceof Iterator) {
+            it = (Iterator) val;
+        }
+
+        if (it != null) {
+            while (it.hasNext()) {
+                Object elem = it.next();
+                visitAttribute(elemType, elem);
+            }
+        }
+    }
+
+    void visitStruct(AtlasStructType structType, Object val) throws 
AtlasBaseException {
+        if (structType == null || val == null) {
+            return;
+        }
+
+        final AtlasStruct struct;
+
+        if (val instanceof AtlasStruct) {
+            struct = (AtlasStruct) val;
+        } else if (val instanceof Map) {
+            Map attributes = AtlasTypeUtil.toStructAttributes((Map) val);
+
+            struct = new AtlasStruct(structType.getTypeName(), attributes);
+        } else {
+            throw new AtlasBaseException(AtlasErrorCode.INVALID_STRUCT_VALUE, 
val.toString());
+        }
+
+        visitStruct(structType, struct);
+    }
+
+    void visitEntity(AtlasEntityType entityType, AtlasEntity entity) throws 
AtlasBaseException {
+        List<String> visitedAttributes = new ArrayList<>();
+
+        // visit relationship attributes
+        visitRelationships(entityType, entity, visitedAttributes);
+
+        // visit struct attributes
+        for (AtlasAttribute attribute : 
entityType.getAllAttributes().values()) {
+            AtlasType attrType = attribute.getAttributeType();
+            String    attrName = attribute.getName();
+            Object    attrVal  = entity.getAttribute(attrName);
+
+            if (entity.hasAttribute(attrName) && 
!visitedAttributes.contains(attrName)) {
+                visitAttribute(attrType, attrVal);
+            }
+        }
+    }
+
+    private void visitRelationships(AtlasEntityType entityType, AtlasEntity 
entity, List<String> visitedAttributes) throws AtlasBaseException {
+        for (AtlasAttribute attribute : 
entityType.getRelationshipAttributes().values()) {
+            AtlasType attrType = attribute.getAttributeType();
+            String attrName = attribute.getName();
+            Object attrVal = entity.getRelationshipAttribute(attrName);
+
+            if (entity.hasRelationshipAttribute(attrName)) {
+                visitAttribute(attrType, attrVal);
+
+                visitedAttributes.add(attrName);
+            }
+        }
+    }
+
+    void visitStruct(AtlasStructType structType, AtlasStruct struct) throws 
AtlasBaseException {
+        for (AtlasAttribute attribute : 
structType.getAllAttributes().values()) {
+            AtlasType attrType = attribute.getAttributeType();
+            Object    attrVal  = struct.getAttribute(attribute.getName());
+
+            visitAttribute(attrType, attrVal);
+        }
+    }
+
+    void walkEntityGraph(AtlasEntity entity) throws AtlasBaseException {
+        if (entity == null) {
+            return;
+        }
+
+        AtlasEntityType type = 
typeRegistry.getEntityTypeByName(entity.getTypeName());
+
+        if (type == null) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, 
TypeCategory.ENTITY.name(), entity.getTypeName());
+        }
+
+        recordObjectReference(entity.getGuid());
+
+        visitEntity(type, entity);
+    }
+
+
+    boolean isPrimitive(TypeCategory typeCategory) {
+        return typeCategory == TypeCategory.PRIMITIVE || typeCategory == 
TypeCategory.ENUM;
+    }
+
+    private void recordObjectReference(String guid) {
+        discoveryContext.addReferencedGuid(guid);
+    }
+
+    private void recordObjectReference(AtlasObjectId objId) {
+        if (AtlasTypeUtil.isValidGuid(objId)) {
+            discoveryContext.addReferencedGuid(objId.getGuid());
+        } else {
+            discoveryContext.addReferencedByUniqAttribs(objId);
+        }
+    }
+}

Reply via email to