http://git-wip-us.apache.org/repos/asf/atlas/blob/3d5b4880/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java new file mode 100644 index 0000000..5e33cf5 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java @@ -0,0 +1,780 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v2; + + +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.GraphTransactionInterceptor; +import org.apache.atlas.RequestContext; +import org.apache.atlas.annotation.GraphTransaction; +import org.apache.atlas.authorize.AtlasEntityAccessRequest; +import org.apache.atlas.authorize.AtlasPrivilege; +import org.apache.atlas.authorize.AtlasAuthorizationUtils; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.*; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.store.graph.AtlasEntityStore; +import org.apache.atlas.repository.store.graph.EntityGraphDiscovery; +import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext; +import org.apache.atlas.repository.store.graph.v1.DeleteHandlerV1; +import org.apache.atlas.type.AtlasClassificationType; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasStructType.AtlasAttribute; +import org.apache.atlas.type.AtlasType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.type.AtlasTypeUtil; +import org.apache.atlas.utils.AtlasEntityUtil; +import org.apache.atlas.utils.AtlasPerfTracer; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import javax.inject.Inject; +import java.util.*; + +import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.DELETE; +import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE; + + +@Component +public class AtlasEntityStoreV2 implements AtlasEntityStore { + private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV2.class); + private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("store.EntityStore"); + + + private final DeleteHandlerV1 deleteHandler; + private final AtlasTypeRegistry typeRegistry; + private final AtlasEntityChangeNotifier entityChangeNotifier; + private final EntityGraphMapper entityGraphMapper; + private final EntityGraphRetriever entityRetriever; + + @Inject + public AtlasEntityStoreV2(DeleteHandlerV1 deleteHandler, AtlasTypeRegistry typeRegistry, + AtlasEntityChangeNotifier entityChangeNotifier, EntityGraphMapper entityGraphMapper) { + this.deleteHandler = deleteHandler; + this.typeRegistry = typeRegistry; + this.entityChangeNotifier = entityChangeNotifier; + this.entityGraphMapper = entityGraphMapper; + this.entityRetriever = new EntityGraphRetriever(typeRegistry); + } + + @Override + @GraphTransaction + public List<String> getEntityGUIDS(final String typename) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> getEntityGUIDS({})", typename); + } + + if (StringUtils.isEmpty(typename) || !typeRegistry.isRegisteredType(typename)) { + throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_TYPENAME); + } + + List<String> ret = AtlasGraphUtilsV2.findEntityGUIDsByType(typename); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== getEntityGUIDS({})", typename); + } + + return ret; + } + + @Override + @GraphTransaction + public AtlasEntityWithExtInfo getById(String guid) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> getById({})", guid); + } + + AtlasEntityWithExtInfo ret = entityRetriever.toAtlasEntityWithExtInfo(guid); + + AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_READ, new AtlasEntityHeader(ret.getEntity())), "read entity: guid=", guid); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== getById({}): {}", guid, ret); + } + + return ret; + } + + @Override + @GraphTransaction + public AtlasEntitiesWithExtInfo getByIds(List<String> guids) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> getByIds({})", guids); + } + + AtlasEntitiesWithExtInfo ret = entityRetriever.toAtlasEntitiesWithExtInfo(guids); + + // verify authorization to read the entities + if(ret != null){ + for(String guid : guids){ + AtlasEntity entity = ret.getEntity(guid); + + AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_READ, new AtlasEntityHeader(entity)), "read entity: guid=", guid); + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== getByIds({}): {}", guids, ret); + } + + return ret; + } + + @Override + @GraphTransaction + public AtlasEntityWithExtInfo getByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> getByUniqueAttribute({}, {})", entityType.getTypeName(), uniqAttributes); + } + + AtlasVertex entityVertex = AtlasGraphUtilsV2.getVertexByUniqueAttributes(entityType, uniqAttributes); + AtlasEntityWithExtInfo ret = entityRetriever.toAtlasEntityWithExtInfo(entityVertex); + + AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_READ, new AtlasEntityHeader(ret.getEntity())), "read entity: typeName=", entityType.getTypeName(), ", uniqueAttributes=", uniqAttributes); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== getByUniqueAttribute({}, {}): {}", entityType.getTypeName(), uniqAttributes, ret); + } + + return ret; + } + + @Override + @GraphTransaction + public EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean isPartialUpdate) throws AtlasBaseException { + return createOrUpdate(entityStream, isPartialUpdate, false); + } + + @Override + @GraphTransaction + public EntityMutationResponse createOrUpdateForImport(EntityStream entityStream) throws AtlasBaseException { + return createOrUpdate(entityStream, false, true); + } + + @Override + @GraphTransaction + public EntityMutationResponse updateEntity(AtlasObjectId objectId, AtlasEntityWithExtInfo updatedEntityInfo, boolean isPartialUpdate) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> updateEntity({}, {}, {})", objectId, updatedEntityInfo, isPartialUpdate); + } + + if (objectId == null || updatedEntityInfo == null || updatedEntityInfo.getEntity() == null) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "null entity-id/entity"); + } + + final String guid; + + if (AtlasTypeUtil.isAssignedGuid(objectId.getGuid())) { + guid = objectId.getGuid(); + } else { + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(objectId.getTypeName()); + + if (entityType == null) { + throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_TYPENAME, objectId.getTypeName()); + } + + guid = AtlasGraphUtilsV2.getGuidByUniqueAttributes(typeRegistry.getEntityTypeByName(objectId.getTypeName()), objectId.getUniqueAttributes()); + } + + AtlasEntity entity = updatedEntityInfo.getEntity(); + + entity.setGuid(guid); + + return createOrUpdate(new AtlasEntityStream(updatedEntityInfo), isPartialUpdate, false); + } + + @Override + @GraphTransaction + public EntityMutationResponse updateByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes, + AtlasEntityWithExtInfo updatedEntityInfo) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> updateByUniqueAttributes({}, {})", entityType.getTypeName(), uniqAttributes); + } + + if (updatedEntityInfo == null || updatedEntityInfo.getEntity() == null) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entity to update."); + } + + String guid = AtlasGraphUtilsV2.getGuidByUniqueAttributes(entityType, uniqAttributes); + AtlasEntity entity = updatedEntityInfo.getEntity(); + + entity.setGuid(guid); + + return createOrUpdate(new AtlasEntityStream(updatedEntityInfo), true, false); + } + + @Override + @GraphTransaction + public EntityMutationResponse updateEntityAttributeByGuid(String guid, String attrName, Object attrValue) + throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> updateEntityAttributeByGuid({}, {}, {})", guid, attrName, attrValue); + } + + AtlasEntityHeader entity = entityRetriever.toAtlasEntityHeaderWithClassifications(guid); + AtlasEntityType entityType = (AtlasEntityType) typeRegistry.getType(entity.getTypeName()); + AtlasAttribute attr = entityType.getAttribute(attrName); + + if (attr == null) { + throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_ATTRIBUTE, attrName, entity.getTypeName()); + } + + AtlasType attrType = attr.getAttributeType(); + AtlasEntity updateEntity = new AtlasEntity(); + + updateEntity.setGuid(guid); + updateEntity.setTypeName(entity.getTypeName()); + + switch (attrType.getTypeCategory()) { + case PRIMITIVE: + updateEntity.setAttribute(attrName, attrValue); + break; + case OBJECT_ID_TYPE: + AtlasObjectId objId; + + if (attrValue instanceof String) { + objId = new AtlasObjectId((String) attrValue, attr.getAttributeDef().getTypeName()); + } else { + objId = (AtlasObjectId) attrType.getNormalizedValue(attrValue); + } + + updateEntity.setAttribute(attrName, objId); + break; + + default: + throw new AtlasBaseException(AtlasErrorCode.ATTRIBUTE_UPDATE_NOT_SUPPORTED, attrName, attrType.getTypeName()); + } + + return createOrUpdate(new AtlasEntityStream(updateEntity), true, false); + } + + @Override + @GraphTransaction + public EntityMutationResponse deleteById(final String guid) throws AtlasBaseException { + if (StringUtils.isEmpty(guid)) { + throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid); + } + + Collection<AtlasVertex> deletionCandidates = new ArrayList<>(); + AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(guid); + + if (vertex != null) { + AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(vertex); + + AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_DELETE, entityHeader), "delete entity: guid=", guid); + + deletionCandidates.add(vertex); + } else { + if (LOG.isDebugEnabled()) { + // Entity does not exist - treat as non-error, since the caller + // wanted to delete the entity and it's already gone. + LOG.debug("Deletion request ignored for non-existent entity with guid " + guid); + } + } + + EntityMutationResponse ret = deleteVertices(deletionCandidates); + + // Notify the change listeners + entityChangeNotifier.onEntitiesMutated(ret, false); + + return ret; + } + + @Override + @GraphTransaction + public EntityMutationResponse deleteByIds(final List<String> guids) throws AtlasBaseException { + if (CollectionUtils.isEmpty(guids)) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Guid(s) not specified"); + } + + Collection<AtlasVertex> deletionCandidates = new ArrayList<>(); + + for (String guid : guids) { + AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(guid); + + if (vertex == null) { + if (LOG.isDebugEnabled()) { + // Entity does not exist - treat as non-error, since the caller + // wanted to delete the entity and it's already gone. + LOG.debug("Deletion request ignored for non-existent entity with guid " + guid); + } + + continue; + } + + AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(vertex); + + AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_DELETE, entityHeader), "delete entity: guid=", guid); + + deletionCandidates.add(vertex); + } + + if (deletionCandidates.isEmpty()) { + LOG.info("No deletion candidate entities were found for guids %s", guids); + } + + EntityMutationResponse ret = deleteVertices(deletionCandidates); + + // Notify the change listeners + entityChangeNotifier.onEntitiesMutated(ret, false); + + return ret; + } + + @Override + @GraphTransaction + public EntityMutationResponse deleteByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes) throws AtlasBaseException { + if (MapUtils.isEmpty(uniqAttributes)) { + throw new AtlasBaseException(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND, uniqAttributes.toString()); + } + + Collection<AtlasVertex> deletionCandidates = new ArrayList<>(); + AtlasVertex vertex = AtlasGraphUtilsV2.findByUniqueAttributes(entityType, uniqAttributes); + + if (vertex != null) { + AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(vertex); + + AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_DELETE, entityHeader), "delete entity: typeName=", entityType.getTypeName(), ", uniqueAttributes=", uniqAttributes); + + deletionCandidates.add(vertex); + } else { + if (LOG.isDebugEnabled()) { + // Entity does not exist - treat as non-error, since the caller + // wanted to delete the entity and it's already gone. + LOG.debug("Deletion request ignored for non-existent entity with uniqueAttributes " + uniqAttributes); + } + } + + EntityMutationResponse ret = deleteVertices(deletionCandidates); + + // Notify the change listeners + entityChangeNotifier.onEntitiesMutated(ret, false); + + return ret; + } + + @Override + @GraphTransaction + public String getGuidByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes) throws AtlasBaseException{ + return AtlasGraphUtilsV2.getGuidByUniqueAttributes(entityType, uniqAttributes); + } + + @Override + @GraphTransaction + public void addClassifications(final String guid, final List<AtlasClassification> classifications) throws AtlasBaseException { + if (StringUtils.isEmpty(guid)) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Guid(s) not specified"); + } + + if (CollectionUtils.isEmpty(classifications)) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "classifications(s) not specified"); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Adding classifications={} to entity={}", classifications, guid); + } + + AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(guid); + + for (AtlasClassification classification : classifications) { + AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_ADD_CLASSIFICATION, entityHeader, classification), + "add classification: guid=", guid, ", classification=", classification.getTypeName()); + } + + GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid); + for (AtlasClassification classification : classifications) { + validateAndNormalize(classification); + } + + // validate if entity, not already associated with classifications + validateEntityAssociations(guid, classifications); + + entityGraphMapper.addClassifications(new EntityMutationContext(), guid, classifications); + } + + @Override + @GraphTransaction + public void updateClassifications(String guid, List<AtlasClassification> classifications) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("Updating classifications={} for entity={}", classifications, guid); + } + + if (StringUtils.isEmpty(guid)) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Guid not specified"); + } + + if (CollectionUtils.isEmpty(classifications)) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "classifications(s) not specified"); + } + + AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(guid); + + for (AtlasClassification classification : classifications) { + AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_UPDATE_CLASSIFICATION, entityHeader, classification), "update classification: guid=", guid, ", classification=", classification.getTypeName()); + } + + GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid); + + entityGraphMapper.updateClassifications(new EntityMutationContext(), guid, classifications); + } + + @Override + @GraphTransaction + public void addClassification(final List<String> guids, final AtlasClassification classification) throws AtlasBaseException { + if (CollectionUtils.isEmpty(guids)) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Guid(s) not specified"); + } + if (classification == null) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "classification not specified"); + } + + for (String guid : guids) { + AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(guid); + + AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_ADD_CLASSIFICATION, entityHeader, classification), + "add classification: guid=", guid, ", classification=", classification.getTypeName()); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Adding classification={} to entities={}", classification, guids); + } + + GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guids); + + validateAndNormalize(classification); + + List<AtlasClassification> classifications = Collections.singletonList(classification); + + for (String guid : guids) { + validateEntityAssociations(guid, classifications); + + entityGraphMapper.addClassifications(new EntityMutationContext(), guid, classifications); + } + } + + @Override + @GraphTransaction + public void deleteClassifications(final String guid, final List<String> classificationNames) throws AtlasBaseException { + if (StringUtils.isEmpty(guid)) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Guid(s) not specified"); + } + if (CollectionUtils.isEmpty(classificationNames)) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "classifications(s) not specified"); + } + + AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(guid); + + for (String classification : classificationNames) { + AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_REMOVE_CLASSIFICATION, entityHeader, new AtlasClassification(classification)), "remove classification: guid=", guid, ", classification=", classification); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Deleting classifications={} from entity={}", classificationNames, guid); + } + + GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid); + + entityGraphMapper.deleteClassifications(guid, classificationNames); + } + + + @GraphTransaction + public List<AtlasClassification> retrieveClassifications(String guid) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("Retriving classifications for entity={}", guid); + } + + AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(guid); + + return entityHeader.getClassifications(); + } + + + @Override + @GraphTransaction + public List<AtlasClassification> getClassifications(String guid) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("Getting classifications for entity={}", guid); + } + + AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(guid); + + AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_READ_CLASSIFICATION, entityHeader), "get classifications: guid=", guid); + + return entityHeader.getClassifications(); + } + + @Override + @GraphTransaction + public AtlasClassification getClassification(String guid, String classificationName) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("Getting classifications for entities={}", guid); + } + + AtlasClassification ret = null; + AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(guid); + + if (CollectionUtils.isNotEmpty(entityHeader.getClassifications())) { + AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_READ_CLASSIFICATION, entityHeader), "get classification: guid=", guid, ", classification=", classificationName); + + for (AtlasClassification classification : entityHeader.getClassifications()) { + if (!StringUtils.equalsIgnoreCase(classification.getTypeName(), classificationName)) { + continue; + } + + if (StringUtils.isEmpty(classification.getEntityGuid()) || StringUtils.equalsIgnoreCase(classification.getEntityGuid(), guid)) { + ret = classification; + break; + } else if (ret == null) { + ret = classification; + } + } + } + + if (ret == null) { + throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, classificationName); + } + + return ret; + } + + private EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean isPartialUpdate, boolean replaceClassifications) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> createOrUpdate()"); + } + + if (entityStream == null || !entityStream.hasNext()) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update."); + } + + AtlasPerfTracer perf = null; + + if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { + perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "createOrUpdate()"); + } + + try { + final boolean isImport = entityStream instanceof EntityImportStream; + final EntityMutationContext context = preCreateOrUpdate(entityStream, entityGraphMapper, isPartialUpdate); + + // Check if authorized to create entities + if (!isImport && CollectionUtils.isNotEmpty(context.getCreatedEntities())) { + for (AtlasEntity entity : context.getCreatedEntities()) { + AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_CREATE, new AtlasEntityHeader(entity)), + "create entity: type=", entity.getTypeName()); + } + } + + // for existing entities, skip update if incoming entity doesn't have any change + if (CollectionUtils.isNotEmpty(context.getUpdatedEntities())) { + List<AtlasEntity> entitiesToSkipUpdate = null; + + for (AtlasEntity entity : context.getUpdatedEntities()) { + String guid = entity.getGuid(); + AtlasVertex vertex = context.getVertex(guid); + AtlasEntity entityInStore = entityRetriever.toAtlasEntity(vertex); + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName()); + + if (!AtlasEntityUtil.hasAnyAttributeUpdate(entityType, entity, entityInStore)) { + // if classifications are to be replaced as well, then skip updates only when no change in classifications as well + if (!replaceClassifications || Objects.equals(entity.getClassifications(), entityInStore.getClassifications())) { + if (entitiesToSkipUpdate == null) { + entitiesToSkipUpdate = new ArrayList<>(); + } + + entitiesToSkipUpdate.add(entity); + } + } + } + + if (entitiesToSkipUpdate != null) { + context.getUpdatedEntities().removeAll(entitiesToSkipUpdate); + } + + // Check if authorized to update entities + if (!isImport) { + for (AtlasEntity entity : context.getUpdatedEntities()) { + AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_UPDATE, new AtlasEntityHeader(entity)), + "update entity: type=", entity.getTypeName()); + } + } + } + + EntityMutationResponse ret = entityGraphMapper.mapAttributesAndClassifications(context, isPartialUpdate, replaceClassifications); + + ret.setGuidAssignments(context.getGuidAssignments()); + + // Notify the change listeners + entityChangeNotifier.onEntitiesMutated(ret, isImport); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== createOrUpdate()"); + } + + return ret; + } finally { + AtlasPerfTracer.log(perf); + } + } + + private EntityMutationContext preCreateOrUpdate(EntityStream entityStream, EntityGraphMapper entityGraphMapper, boolean isPartialUpdate) throws AtlasBaseException { + EntityGraphDiscovery graphDiscoverer = new AtlasEntityGraphDiscoveryV2(typeRegistry, entityStream); + EntityGraphDiscoveryContext discoveryContext = graphDiscoverer.discoverEntities(); + EntityMutationContext context = new EntityMutationContext(discoveryContext); + + for (String guid : discoveryContext.getReferencedGuids()) { + AtlasVertex vertex = discoveryContext.getResolvedEntityVertex(guid); + AtlasEntity entity = entityStream.getByGuid(guid); + + if (entity != null) { // entity would be null if guid is not in the stream but referenced by an entity in the stream + if (vertex != null) { + if (!isPartialUpdate) { + graphDiscoverer.validateAndNormalize(entity); + } else { + graphDiscoverer.validateAndNormalizeForUpdate(entity); + } + + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName()); + + String guidVertex = AtlasGraphUtilsV2.getIdFromVertex(vertex); + + if (!StringUtils.equals(guidVertex, guid)) { // if entity was found by unique attribute + entity.setGuid(guidVertex); + } + + context.addUpdated(guid, entity, entityType, vertex); + } else { + graphDiscoverer.validateAndNormalize(entity); + + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName()); + + + //Create vertices which do not exist in the repository + if ((entityStream instanceof EntityImportStream) && AtlasTypeUtil.isAssignedGuid(entity.getGuid())) { + vertex = entityGraphMapper.createVertexWithGuid(entity, entity.getGuid()); + } else { + vertex = entityGraphMapper.createVertex(entity); + } + + discoveryContext.addResolvedGuid(guid, vertex); + + String generatedGuid = AtlasGraphUtilsV2.getIdFromVertex(vertex); + + entity.setGuid(generatedGuid); + + context.addCreated(guid, entity, entityType, vertex); + } + + // during import, update the system attributes + if (entityStream instanceof EntityImportStream) { + entityGraphMapper.updateSystemAttributes(vertex, entity); + } + } + } + + return context; + } + + private EntityMutationResponse deleteVertices(Collection<AtlasVertex> deletionCandidates) throws AtlasBaseException { + EntityMutationResponse response = new EntityMutationResponse(); + RequestContext req = RequestContext.get(); + + deleteHandler.deleteEntities(deletionCandidates); // this will update req with list of deleted/updated entities + + for (AtlasObjectId entity : req.getDeletedEntities()) { + response.addEntity(DELETE, entity); + } + + for (AtlasObjectId entity : req.getUpdatedEntities()) { + response.addEntity(UPDATE, entity); + } + + return response; + } + + private void validateAndNormalize(AtlasClassification classification) throws AtlasBaseException { + AtlasClassificationType type = typeRegistry.getClassificationTypeByName(classification.getTypeName()); + + if (type == null) { + throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, classification.getTypeName()); + } + + List<String> messages = new ArrayList<>(); + + type.validateValue(classification, classification.getTypeName(), messages); + + if (!messages.isEmpty()) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, messages); + } + + type.getNormalizedValue(classification); + } + + /** + * Validate if classification is not already associated with the entities + * + * @param guid unique entity id + * @param classifications list of classifications to be associated + */ + private void validateEntityAssociations(String guid, List<AtlasClassification> classifications) throws AtlasBaseException { + List<String> entityClassifications = getClassificationNames(guid); + String entityTypeName = AtlasGraphUtilsV2.getTypeNameFromGuid(guid); + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityTypeName); + + for (AtlasClassification classification : classifications) { + String newClassification = classification.getTypeName(); + + if (CollectionUtils.isNotEmpty(entityClassifications) && entityClassifications.contains(newClassification)) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "entity: " + guid + + ", already associated with classification: " + newClassification); + } + + // for each classification, check whether there are entities it should be restricted to + AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(newClassification); + + if (!classificationType.canApplyToEntityType(entityType)) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_ENTITY_FOR_CLASSIFICATION, guid, entityTypeName, newClassification); + } + } + } + + private List<String> getClassificationNames(String guid) throws AtlasBaseException { + List<String> ret = null; + List<AtlasClassification> classifications = retrieveClassifications(guid); + + if (CollectionUtils.isNotEmpty(classifications)) { + ret = new ArrayList<>(); + + for (AtlasClassification classification : classifications) { + String entityGuid = classification.getEntityGuid(); + + if (StringUtils.isEmpty(entityGuid) || StringUtils.equalsIgnoreCase(guid, entityGuid)) { + ret.add(classification.getTypeName()); + } + } + } + + return ret; + } +}
http://git-wip-us.apache.org/repos/asf/atlas/blob/3d5b4880/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStream.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStream.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStream.java new file mode 100644 index 0000000..75a7e61 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStream.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v2; + +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; + +import java.util.Iterator; + +public class AtlasEntityStream implements EntityStream { + protected final AtlasEntitiesWithExtInfo entitiesWithExtInfo; + protected final EntityStream entityStream; + private Iterator<AtlasEntity> iterator; + + + public AtlasEntityStream(AtlasEntity entity) { + this(new AtlasEntitiesWithExtInfo(entity)); + } + + public AtlasEntityStream(AtlasEntityWithExtInfo entityWithExtInfo) { + this(new AtlasEntitiesWithExtInfo(entityWithExtInfo)); + } + + public AtlasEntityStream(AtlasEntitiesWithExtInfo entitiesWithExtInfo) { + this.entitiesWithExtInfo = entitiesWithExtInfo; + this.iterator = this.entitiesWithExtInfo.getEntities().iterator(); + this.entityStream = null; + } + + public AtlasEntityStream(AtlasEntity entity, EntityStream entityStream) { + this.entitiesWithExtInfo = new AtlasEntitiesWithExtInfo(entity); + this.iterator = this.entitiesWithExtInfo.getEntities().iterator(); + this.entityStream = entityStream; + } + + public AtlasEntityStream(AtlasEntityWithExtInfo entityWithExtInfo, EntityStream entityStream) { + this.entitiesWithExtInfo = new AtlasEntitiesWithExtInfo(entityWithExtInfo); + this.iterator = this.entitiesWithExtInfo.getEntities().iterator(); + this.entityStream = entityStream; + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public AtlasEntity next() { + return iterator.hasNext() ? iterator.next() : null; + } + + @Override + public void reset() { + this.iterator = entitiesWithExtInfo.getEntities().iterator(); + } + + @Override + public AtlasEntity getByGuid(String guid) { + return entityStream != null ? entityStream.getByGuid(guid) : entitiesWithExtInfo.getEntity(guid); + } + + @Override + public String toString() { + StringBuffer sb = new StringBuffer("AtlasEntityStream{"); + + sb.append("entitiesWithExtInfo=").append(entitiesWithExtInfo); + sb.append(", iterator=").append(iterator); + sb.append('}'); + + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/3d5b4880/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStreamForImport.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStreamForImport.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStreamForImport.java new file mode 100644 index 0000000..6bf962e --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStreamForImport.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v2; + +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; + +public class AtlasEntityStreamForImport extends AtlasEntityStream implements EntityImportStream { + private int currentPosition = 0; + + public AtlasEntityStreamForImport(AtlasEntityWithExtInfo entityWithExtInfo, EntityStream entityStream) { + super(entityWithExtInfo, entityStream); + } + + @Override + public AtlasEntityWithExtInfo getNextEntityWithExtInfo() { + currentPosition++; + AtlasEntity entity = next(); + + return entity != null ? new AtlasEntityWithExtInfo(entity, super.entitiesWithExtInfo) : null; + } + + @Override + public AtlasEntity getByGuid(String guid) { + AtlasEntity ent = super.entitiesWithExtInfo.getEntity(guid); + + if(ent == null && entityStream != null) { + return entityStream.getByGuid(guid); + } + + return ent; + } + + @Override + public int size() { + return 1; + } + + @Override + public void setPosition(int position) { + // not applicable for a single entity stream + } + + @Override + public int getPosition() { + return currentPosition; + } + + @Override + public void setPositionUsingEntityGuid(String guid) { + } + + @Override + public void onImportComplete(String guid) { + + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/3d5b4880/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEnumDefStoreV2.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEnumDefStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEnumDefStoreV2.java new file mode 100644 index 0000000..7d7233f --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEnumDefStoreV2.java @@ -0,0 +1,325 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v2; + +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.typedef.AtlasEnumDef; +import org.apache.atlas.model.typedef.AtlasEnumDef.AtlasEnumElementDef; +import org.apache.atlas.repository.Constants; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.typesystem.types.DataTypes.TypeCategory; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.atlas.authorize.AtlasPrivilege; +import org.apache.atlas.authorize.AtlasTypeAccessRequest; +import org.apache.atlas.authorize.AtlasAuthorizationUtils; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** + * EnumDef store in v1 format. + */ +class AtlasEnumDefStoreV2 extends AtlasAbstractDefStoreV2<AtlasEnumDef> { + private static final Logger LOG = LoggerFactory.getLogger(AtlasEnumDefStoreV2.class); + + public AtlasEnumDefStoreV2(AtlasTypeDefGraphStoreV2 typeDefStore, AtlasTypeRegistry typeRegistry) { + super(typeDefStore, typeRegistry); + } + + @Override + public AtlasVertex preCreate(AtlasEnumDef enumDef) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasEnumDefStoreV1.preCreate({})", enumDef); + } + + validateType(enumDef); + + AtlasVertex vertex = typeDefStore.findTypeVertexByName(enumDef.getName()); + + if (vertex != null) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_ALREADY_EXISTS, enumDef.getName()); + } + + vertex = typeDefStore.createTypeVertex(enumDef); + + toVertex(enumDef, vertex); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasEnumDefStoreV1.preCreate({}): {}", enumDef, vertex); + } + + return vertex; + } + + @Override + public AtlasEnumDef create(AtlasEnumDef enumDef, AtlasVertex preCreateResult) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasEnumDefStoreV1.create({}, {})", enumDef, preCreateResult); + } + + AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_CREATE, enumDef), "create enum-def ", enumDef.getName()); + + AtlasVertex vertex = (preCreateResult == null) ? preCreate(enumDef) : preCreateResult; + + AtlasEnumDef ret = toEnumDef(vertex); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasEntityDefStoreV1.create({}, {}): {}", enumDef, preCreateResult, ret); + } + + return ret; + } + + @Override + public List<AtlasEnumDef> getAll() throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasEnumDefStoreV1.getAll()"); + } + + List<AtlasEnumDef> ret = new ArrayList<>(); + + Iterator<AtlasVertex> vertices = typeDefStore.findTypeVerticesByCategory(TypeCategory.ENUM); + while (vertices.hasNext()) { + ret.add(toEnumDef(vertices.next())); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasEnumDefStoreV1.getAll(): count={}", ret.size()); + } + + return ret; + } + + @Override + public AtlasEnumDef getByName(String name) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasEnumDefStoreV1.getByName({})", name); + } + + AtlasVertex vertex = typeDefStore.findTypeVertexByNameAndCategory(name, TypeCategory.ENUM); + + if (vertex == null) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name); + } + + vertex.getProperty(Constants.TYPE_CATEGORY_PROPERTY_KEY, TypeCategory.class); + + AtlasEnumDef ret = toEnumDef(vertex); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasEnumDefStoreV1.getByName({}): {}", name, ret); + } + + return ret; + } + + @Override + public AtlasEnumDef getByGuid(String guid) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasEnumDefStoreV1.getByGuid({})", guid); + } + + AtlasVertex vertex = typeDefStore.findTypeVertexByGuidAndCategory(guid, TypeCategory.ENUM); + + if (vertex == null) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid); + } + + AtlasEnumDef ret = toEnumDef(vertex); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasEnumDefStoreV1.getByGuid({}): {}", guid, ret); + } + + return ret; + } + + @Override + public AtlasEnumDef update(AtlasEnumDef enumDef) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasEnumDefStoreV1.update({})", enumDef); + } + + validateType(enumDef); + + AtlasEnumDef ret = StringUtils.isNotBlank(enumDef.getGuid()) ? updateByGuid(enumDef.getGuid(), enumDef) + : updateByName(enumDef.getName(), enumDef); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasEnumDefStoreV1.update({}): {}", enumDef, ret); + } + + return ret; + } + + @Override + public AtlasEnumDef updateByName(String name, AtlasEnumDef enumDef) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasEnumDefStoreV1.updateByName({}, {})", name, enumDef); + } + + AtlasEnumDef existingDef = typeRegistry.getEnumDefByName(name); + + AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_UPDATE, existingDef), "update enum-def ", name); + + validateType(enumDef); + + AtlasVertex vertex = typeDefStore.findTypeVertexByNameAndCategory(name, TypeCategory.ENUM); + + if (vertex == null) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name); + } + + typeDefStore.updateTypeVertex(enumDef, vertex); + + toVertex(enumDef, vertex); + + AtlasEnumDef ret = toEnumDef(vertex); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasEnumDefStoreV1.updateByName({}, {}): {}", name, enumDef, ret); + } + + return ret; + } + + @Override + public AtlasEnumDef updateByGuid(String guid, AtlasEnumDef enumDef) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasEnumDefStoreV1.updateByGuid({})", guid); + } + + AtlasEnumDef existingDef = typeRegistry.getEnumDefByGuid(guid); + + AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_UPDATE, existingDef), "update enum-def ", (existingDef != null ? existingDef.getName() : guid)); + + validateType(enumDef); + + AtlasVertex vertex = typeDefStore.findTypeVertexByGuidAndCategory(guid, TypeCategory.ENUM); + + if (vertex == null) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid); + } + + typeDefStore.updateTypeVertex(enumDef, vertex); + + toVertex(enumDef, vertex); + + AtlasEnumDef ret = toEnumDef(vertex); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasEnumDefStoreV1.updateByGuid({}): {}", guid, ret); + } + + return ret; + } + + @Override + public AtlasVertex preDeleteByName(String name) throws AtlasBaseException { + AtlasVertex vertex = typeDefStore.findTypeVertexByNameAndCategory(name, TypeCategory.ENUM); + + if (vertex == null) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name); + } + + AtlasEnumDef existingDef = typeRegistry.getEnumDefByName(name); + + AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_DELETE, existingDef), "delete enum-def ", (existingDef != null ? existingDef.getName() : name)); + + return vertex; + } + + @Override + public AtlasVertex preDeleteByGuid(String guid) throws AtlasBaseException { + AtlasVertex vertex = typeDefStore.findTypeVertexByGuidAndCategory(guid, TypeCategory.ENUM); + + if (vertex == null) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid); + } + + AtlasEnumDef existingDef = typeRegistry.getEnumDefByGuid(guid); + + AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_DELETE, existingDef), "delete enum-def ", (existingDef != null ? existingDef.getName() : guid)); + + return vertex; + } + + private void toVertex(AtlasEnumDef enumDef, AtlasVertex vertex) throws AtlasBaseException { + if (CollectionUtils.isEmpty(enumDef.getElementDefs())) { + throw new AtlasBaseException(AtlasErrorCode.MISSING_MANDATORY_ATTRIBUTE, enumDef.getName(), "values"); + } + + List<String> values = new ArrayList<>(enumDef.getElementDefs().size()); + + for (AtlasEnumElementDef element : enumDef.getElementDefs()) { + // Validate the enum element + if (StringUtils.isEmpty(element.getValue()) || null == element.getOrdinal()) { + throw new AtlasBaseException(AtlasErrorCode.MISSING_MANDATORY_ATTRIBUTE, enumDef.getName(), "elementValue"); + } + + String elemKey = AtlasGraphUtilsV2.getTypeDefPropertyKey(enumDef, element.getValue()); + + AtlasGraphUtilsV2.setProperty(vertex, elemKey, element.getOrdinal()); + + if (StringUtils.isNotBlank(element.getDescription())) { + String descKey = AtlasGraphUtilsV2.getTypeDefPropertyKey(elemKey, "description"); + + AtlasGraphUtilsV2.setProperty(vertex, descKey, element.getDescription()); + } + + values.add(element.getValue()); + } + AtlasGraphUtilsV2.setProperty(vertex, AtlasGraphUtilsV2.getTypeDefPropertyKey(enumDef), values); + } + + private AtlasEnumDef toEnumDef(AtlasVertex vertex) { + AtlasEnumDef ret = null; + + if (vertex != null && typeDefStore.isTypeVertex(vertex, TypeCategory.ENUM)) { + ret = toEnumDef(vertex, new AtlasEnumDef(), typeDefStore); + } + + return ret; + } + + private static AtlasEnumDef toEnumDef(AtlasVertex vertex, AtlasEnumDef enumDef, AtlasTypeDefGraphStoreV2 typeDefStore) { + AtlasEnumDef ret = enumDef != null ? enumDef : new AtlasEnumDef(); + + typeDefStore.vertexToTypeDef(vertex, ret); + + List<AtlasEnumElementDef> elements = new ArrayList<>(); + List<String> elemValues = vertex.getProperty(AtlasGraphUtilsV2.getTypeDefPropertyKey(ret), List.class); + for (String elemValue : elemValues) { + String elemKey = AtlasGraphUtilsV2.getTypeDefPropertyKey(ret, elemValue); + String descKey = AtlasGraphUtilsV2.getTypeDefPropertyKey(elemKey, "description"); + + Integer ordinal = AtlasGraphUtilsV2.getProperty(vertex, elemKey, Integer.class); + String desc = AtlasGraphUtilsV2.getProperty(vertex, descKey, String.class); + + elements.add(new AtlasEnumElementDef(elemValue, desc, ordinal)); + } + ret.setElementDefs(elements); + + return ret; + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/3d5b4880/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java new file mode 100644 index 0000000..e148aa7 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java @@ -0,0 +1,514 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v2; + + +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.SortOrder; +import org.apache.atlas.discovery.SearchProcessor; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.TypeCategory; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.typedef.AtlasBaseTypeDef; +import org.apache.atlas.repository.Constants; +import org.apache.atlas.repository.graph.AtlasGraphProvider; +import org.apache.atlas.repository.graph.GraphHelper; +import org.apache.atlas.repository.graphdb.AtlasEdge; +import org.apache.atlas.repository.graphdb.AtlasElement; +import org.apache.atlas.repository.graphdb.AtlasGraphQuery; +import org.apache.atlas.repository.graphdb.AtlasIndexQuery; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasStructType; +import org.apache.atlas.type.AtlasStructType.AtlasAttribute; +import org.apache.atlas.type.AtlasType; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.atlas.repository.Constants.INDEX_SEARCH_VERTEX_PREFIX_DEFAULT; +import static org.apache.atlas.repository.Constants.INDEX_SEARCH_VERTEX_PREFIX_PROPERTY; +import static org.apache.atlas.repository.graphdb.AtlasGraphQuery.SortOrder.*; + +/** + * Utility methods for Graph. + */ +public class AtlasGraphUtilsV2 { + private static final Logger LOG = LoggerFactory.getLogger(AtlasGraphUtilsV2.class); + + public static final String PROPERTY_PREFIX = Constants.INTERNAL_PROPERTY_KEY_PREFIX + "type."; + public static final String SUPERTYPE_EDGE_LABEL = PROPERTY_PREFIX + ".supertype"; + public static final String ENTITYTYPE_EDGE_LABEL = PROPERTY_PREFIX + ".entitytype"; + public static final String RELATIONSHIPTYPE_EDGE_LABEL = PROPERTY_PREFIX + ".relationshipType"; + public static final String VERTEX_TYPE = "typeSystem"; + + private static boolean USE_INDEX_QUERY_TO_FIND_ENTITY_BY_UNIQUE_ATTRIBUTES = false; + private static String INDEX_SEARCH_PREFIX; + + static { + try { + Configuration conf = ApplicationProperties.get(); + + USE_INDEX_QUERY_TO_FIND_ENTITY_BY_UNIQUE_ATTRIBUTES = conf.getBoolean("atlas.use.index.query.to.find.entity.by.unique.attributes", USE_INDEX_QUERY_TO_FIND_ENTITY_BY_UNIQUE_ATTRIBUTES); + INDEX_SEARCH_PREFIX = conf.getString(INDEX_SEARCH_VERTEX_PREFIX_PROPERTY, INDEX_SEARCH_VERTEX_PREFIX_DEFAULT); + } catch (Exception excp) { + LOG.error("Error reading configuration", excp); + } finally { + LOG.info("atlas.use.index.query.to.find.entity.by.unique.attributes=" + USE_INDEX_QUERY_TO_FIND_ENTITY_BY_UNIQUE_ATTRIBUTES); + } + } + + public static String getTypeDefPropertyKey(AtlasBaseTypeDef typeDef) { + return getTypeDefPropertyKey(typeDef.getName()); + } + + public static String getTypeDefPropertyKey(AtlasBaseTypeDef typeDef, String child) { + return getTypeDefPropertyKey(typeDef.getName(), child); + } + + public static String getTypeDefPropertyKey(String typeName) { + return PROPERTY_PREFIX + typeName; + } + + public static String getTypeDefPropertyKey(String typeName, String child) { + return PROPERTY_PREFIX + typeName + "." + child; + } + + public static String getIdFromVertex(AtlasVertex vertex) { + return vertex.getProperty(Constants.GUID_PROPERTY_KEY, String.class); + } + + public static String getIdFromEdge(AtlasEdge edge) { + return edge.getProperty(Constants.GUID_PROPERTY_KEY, String.class); + } + + public static String getTypeName(AtlasElement element) { + return element.getProperty(Constants.ENTITY_TYPE_PROPERTY_KEY, String.class); + } + + public static String getEdgeLabel(String fromNode, String toNode) { + return PROPERTY_PREFIX + "edge." + fromNode + "." + toNode; + } + + public static String getEdgeLabel(String property) { + return GraphHelper.EDGE_LABEL_PREFIX + property; + } + + public static String getAttributeEdgeLabel(AtlasStructType fromType, String attributeName) throws AtlasBaseException { + return getEdgeLabel(getQualifiedAttributePropertyKey(fromType, attributeName)); + } + + public static String getQualifiedAttributePropertyKey(AtlasStructType fromType, String attributeName) throws AtlasBaseException { + switch (fromType.getTypeCategory()) { + case ENTITY: + case STRUCT: + case CLASSIFICATION: + return fromType.getQualifiedAttributeName(attributeName); + default: + throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_TYPE, fromType.getTypeCategory().name()); + } + } + + public static boolean isEntityVertex(AtlasVertex vertex) { + return StringUtils.isNotEmpty(getIdFromVertex(vertex)) && StringUtils.isNotEmpty(getTypeName(vertex)); + } + + public static boolean isReference(AtlasType type) { + return isReference(type.getTypeCategory()); + } + + public static boolean isReference(TypeCategory typeCategory) { + return typeCategory == TypeCategory.STRUCT || + typeCategory == TypeCategory.ENTITY || + typeCategory == TypeCategory.OBJECT_ID_TYPE; + } + + public static String encodePropertyKey(String key) { + String ret = AtlasStructType.AtlasAttribute.encodePropertyKey(key); + + return ret; + } + + public static String decodePropertyKey(String key) { + String ret = AtlasStructType.AtlasAttribute.decodePropertyKey(key); + + return ret; + } + + /** + * Adds an additional value to a multi-property. + * + * @param propertyName + * @param value + */ + public static AtlasVertex addProperty(AtlasVertex vertex, String propertyName, Object value) { + if (LOG.isDebugEnabled()) { + LOG.debug("==> addProperty({}, {}, {})", toString(vertex), propertyName, value); + } + propertyName = encodePropertyKey(propertyName); + vertex.addProperty(propertyName, value); + return vertex; + } + + public static <T extends AtlasElement> void setProperty(T element, String propertyName, Object value) { + if (LOG.isDebugEnabled()) { + LOG.debug("==> setProperty({}, {}, {})", toString(element), propertyName, value); + } + + propertyName = encodePropertyKey(propertyName); + + Object existingValue = element.getProperty(propertyName, Object.class); + + if (value == null || (value instanceof Collection && ((Collection)value).isEmpty())) { + if (existingValue != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Removing property {} from {}", propertyName, toString(element)); + } + + element.removeProperty(propertyName); + } + } else { + if (!value.equals(existingValue)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Setting property {} in {}", propertyName, toString(element)); + } + + if ( value instanceof Date) { + Long encodedValue = ((Date) value).getTime(); + element.setProperty(propertyName, encodedValue); + } else { + element.setProperty(propertyName, value); + } + } + } + } + + public static <T extends AtlasElement, O> O getProperty(T element, String propertyName, Class<O> returnType) { + Object property = element.getProperty(encodePropertyKey(propertyName), returnType); + + if (LOG.isDebugEnabled()) { + LOG.debug("getProperty({}, {}) ==> {}", toString(element), propertyName, returnType.cast(property)); + } + + return returnType.cast(property); + } + + public static AtlasVertex getVertexByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> attrValues) throws AtlasBaseException { + AtlasVertex vertex = findByUniqueAttributes(entityType, attrValues); + + if (vertex == null) { + throw new AtlasBaseException(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND, entityType.getTypeName(), + attrValues.toString()); + } + + return vertex; + } + + public static String getGuidByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> attrValues) throws AtlasBaseException { + AtlasVertex vertexByUniqueAttributes = getVertexByUniqueAttributes(entityType, attrValues); + return getIdFromVertex(vertexByUniqueAttributes); + } + + public static AtlasVertex findByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> attrValues) { + AtlasVertex vertex = null; + + final Map<String, AtlasAttribute> uniqueAttributes = entityType.getUniqAttributes(); + + if (MapUtils.isNotEmpty(uniqueAttributes) && MapUtils.isNotEmpty(attrValues)) { + for (AtlasAttribute attribute : uniqueAttributes.values()) { + Object attrValue = attrValues.get(attribute.getName()); + + if (attrValue == null) { + continue; + } + + if (canUseIndexQuery(entityType, attribute.getName())) { + vertex = AtlasGraphUtilsV2.getAtlasVertexFromIndexQuery(entityType, attribute, attrValue); + } else { + vertex = AtlasGraphUtilsV2.findByTypeAndPropertyName(entityType.getTypeName(), attribute.getVertexPropertyName(), attrValue); + + // if no instance of given typeName is found, try to find an instance of type's sub-type + if (vertex == null && !entityType.getAllSubTypes().isEmpty()) { + vertex = AtlasGraphUtilsV2.findBySuperTypeAndPropertyName(entityType.getTypeName(), attribute.getVertexPropertyName(), attrValue); + } + } + + if (vertex != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("findByUniqueAttributes(type={}, attrName={}, attrValue={}: found vertex {}", + entityType.getTypeName(), attribute.getName(), attrValue, vertex); + } + + break; + } + } + } + + return vertex; + } + + public static AtlasVertex findByGuid(String guid) { + AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query() + .has(Constants.GUID_PROPERTY_KEY, guid); + + Iterator<AtlasVertex> results = query.vertices().iterator(); + + AtlasVertex vertex = results.hasNext() ? results.next() : null; + + return vertex; + } + + public static String getTypeNameFromGuid(String guid) { + String ret = null; + + if (StringUtils.isNotEmpty(guid)) { + AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(guid); + + ret = (vertex != null) ? AtlasGraphUtilsV2.getTypeName(vertex) : null; + } + + return ret; + } + + public static boolean typeHasInstanceVertex(String typeName) throws AtlasBaseException { + AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance() + .query() + .has(Constants.TYPE_NAME_PROPERTY_KEY, AtlasGraphQuery.ComparisionOperator.EQUAL, typeName); + + Iterator<AtlasVertex> results = query.vertices().iterator(); + + boolean hasInstanceVertex = results != null && results.hasNext(); + + if (LOG.isDebugEnabled()) { + LOG.debug("typeName {} has instance vertex {}", typeName, hasInstanceVertex); + } + + return hasInstanceVertex; + } + + public static AtlasVertex findByTypeAndPropertyName(String typeName, String propertyName, Object attrVal) { + AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query() + .has(Constants.ENTITY_TYPE_PROPERTY_KEY, typeName) + .has(Constants.STATE_PROPERTY_KEY, AtlasEntity.Status.ACTIVE.name()) + .has(propertyName, attrVal); + + Iterator<AtlasVertex> results = query.vertices().iterator(); + + AtlasVertex vertex = results.hasNext() ? results.next() : null; + + return vertex; + } + + public static AtlasVertex findBySuperTypeAndPropertyName(String typeName, String propertyName, Object attrVal) { + AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query() + .has(Constants.SUPER_TYPES_PROPERTY_KEY, typeName) + .has(Constants.STATE_PROPERTY_KEY, AtlasEntity.Status.ACTIVE.name()) + .has(propertyName, attrVal); + + Iterator<AtlasVertex> results = query.vertices().iterator(); + + AtlasVertex vertex = results.hasNext() ? results.next() : null; + + return vertex; + } + + public static List<String> findEntityGUIDsByType(String typename, SortOrder sortOrder) { + AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query() + .has(Constants.ENTITY_TYPE_PROPERTY_KEY, typename); + if (sortOrder != null) { + AtlasGraphQuery.SortOrder qrySortOrder = sortOrder == SortOrder.ASCENDING ? ASC : DESC; + query.orderBy(Constants.QUALIFIED_NAME, qrySortOrder); + } + + Iterator<AtlasVertex> results = query.vertices().iterator(); + ArrayList<String> ret = new ArrayList<>(); + + if (!results.hasNext()) { + return Collections.emptyList(); + } + + while (results.hasNext()) { + ret.add(getIdFromVertex(results.next())); + } + + return ret; + } + + public static List<String> findEntityGUIDsByType(String typename) { + return findEntityGUIDsByType(typename, null); + } + + public static boolean relationshipTypeHasInstanceEdges(String typeName) throws AtlasBaseException { + AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance() + .query() + .has(Constants.TYPE_NAME_PROPERTY_KEY, AtlasGraphQuery.ComparisionOperator.EQUAL, typeName); + + Iterator<AtlasEdge> results = query.edges().iterator(); + + boolean hasInstanceEdges = results != null && results.hasNext(); + + if (LOG.isDebugEnabled()) { + LOG.debug("relationshipType {} has instance edges {}", typeName, hasInstanceEdges); + } + + return hasInstanceEdges; + } + + private static String toString(AtlasElement element) { + if (element instanceof AtlasVertex) { + return toString((AtlasVertex) element); + } else if (element instanceof AtlasEdge) { + return toString((AtlasEdge)element); + } + + return element.toString(); + } + + public static String toString(AtlasVertex vertex) { + if(vertex == null) { + return "vertex[null]"; + } else { + if (LOG.isDebugEnabled()) { + return getVertexDetails(vertex); + } else { + return String.format("vertex[id=%s]", vertex.getId().toString()); + } + } + } + + + public static String toString(AtlasEdge edge) { + if(edge == null) { + return "edge[null]"; + } else { + if (LOG.isDebugEnabled()) { + return getEdgeDetails(edge); + } else { + return String.format("edge[id=%s]", edge.getId().toString()); + } + } + } + + public static String getVertexDetails(AtlasVertex vertex) { + return String.format("vertex[id=%s type=%s guid=%s]", + vertex.getId().toString(), getTypeName(vertex), getIdFromVertex(vertex)); + } + + public static String getEdgeDetails(AtlasEdge edge) { + return String.format("edge[id=%s label=%s from %s -> to %s]", edge.getId(), edge.getLabel(), + toString(edge.getOutVertex()), toString(edge.getInVertex())); + } + + public static AtlasEntity.Status getState(AtlasElement element) { + String state = getStateAsString(element); + return state == null ? null : AtlasEntity.Status.valueOf(state); + } + + public static String getStateAsString(AtlasElement element) { + return element.getProperty(Constants.STATE_PROPERTY_KEY, String.class); + } + + private static boolean canUseIndexQuery(AtlasEntityType entityType, String attributeName) { + boolean ret = false; + + if (USE_INDEX_QUERY_TO_FIND_ENTITY_BY_UNIQUE_ATTRIBUTES) { + final String typeAndSubTypesQryStr = entityType.getTypeAndAllSubTypesQryStr(); + + ret = typeAndSubTypesQryStr.length() <= SearchProcessor.MAX_QUERY_STR_LENGTH_TYPES; + + if (ret) { + Set<String> indexSet = AtlasGraphProvider.getGraphInstance().getVertexIndexKeys(); + try { + ret = indexSet.contains(entityType.getQualifiedAttributeName(attributeName)); + } + catch (AtlasBaseException ex) { + ret = false; + } + } + } + + return ret; + } + + private static AtlasVertex getAtlasVertexFromIndexQuery(AtlasEntityType entityType, AtlasAttribute attribute, Object attrVal) { + String propertyName = attribute.getVertexPropertyName(); + AtlasIndexQuery query = getIndexQuery(entityType, propertyName, attrVal.toString()); + + for (Iterator<AtlasIndexQuery.Result> iter = query.vertices(); iter.hasNext(); ) { + AtlasIndexQuery.Result result = iter.next(); + AtlasVertex vertex = result.getVertex(); + + // skip non-entity vertices, if any got returned + if (vertex == null || !vertex.getPropertyKeys().contains(Constants.GUID_PROPERTY_KEY)) { + continue; + } + + // verify the typeName + String typeNameInVertex = getTypeName(vertex); + + if (!entityType.getTypeAndAllSubTypes().contains(typeNameInVertex)) { + LOG.warn("incorrect vertex type from index-query: expected='{}'; found='{}'", entityType.getTypeName(), typeNameInVertex); + + continue; + } + + if (attrVal.getClass() == String.class) { + String s = (String) attrVal; + String vertexVal = vertex.getProperty(propertyName, String.class); + + if (!s.equalsIgnoreCase(vertexVal)) { + LOG.warn("incorrect match from index-query for property {}: expected='{}'; found='{}'", propertyName, s, vertexVal); + + continue; + } + } + + return vertex; + } + + return null; + } + + private static AtlasIndexQuery getIndexQuery(AtlasEntityType entityType, String propertyName, String value) { + StringBuilder sb = new StringBuilder(); + + sb.append(INDEX_SEARCH_PREFIX + "\"").append(Constants.TYPE_NAME_PROPERTY_KEY).append("\":").append(entityType.getTypeAndAllSubTypesQryStr()) + .append(" AND ") + .append(INDEX_SEARCH_PREFIX + "\"").append(propertyName).append("\":").append(AtlasAttribute.escapeIndexQueryValue(value)) + .append(" AND ") + .append(INDEX_SEARCH_PREFIX + "\"").append(Constants.STATE_PROPERTY_KEY).append("\":ACTIVE"); + + return AtlasGraphProvider.getGraphInstance().indexQuery(Constants.VERTEX_INDEX, sb.toString()); + } + + public static String getIndexSearchPrefix() { + return INDEX_SEARCH_PREFIX; + } +}