http://git-wip-us.apache.org/repos/asf/atlas/blob/4823c8ed/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasStructDefStoreV2.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasStructDefStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasStructDefStoreV2.java new file mode 100644 index 0000000..129bb55 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasStructDefStoreV2.java @@ -0,0 +1,611 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v2; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.authorize.AtlasPrivilege; +import org.apache.atlas.authorize.AtlasTypeAccessRequest; +import org.apache.atlas.authorize.AtlasAuthorizationUtils; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.typedef.AtlasStructDef; +import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; +import org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef; +import org.apache.atlas.v1.model.typedef.AttributeDefinition; +import org.apache.atlas.repository.Constants; +import org.apache.atlas.repository.graph.GraphHelper; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.type.AtlasStructType; +import org.apache.atlas.type.AtlasStructType.AtlasAttribute; +import org.apache.atlas.type.AtlasType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.type.AtlasTypeUtil; +import org.apache.atlas.typesystem.types.DataTypes.TypeCategory; +import org.apache.atlas.v1.model.typedef.Multiplicity; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * StructDef store in v1 format. + */ +public class AtlasStructDefStoreV2 extends AtlasAbstractDefStoreV2<AtlasStructDef> { + private static final Logger LOG = LoggerFactory.getLogger(AtlasStructDefStoreV2.class); + + public AtlasStructDefStoreV2(AtlasTypeDefGraphStoreV2 typeDefStore, AtlasTypeRegistry typeRegistry) { + super(typeDefStore, typeRegistry); + } + + @Override + public AtlasVertex preCreate(AtlasStructDef structDef) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasStructDefStoreV1.preCreate({})", structDef); + } + + validateType(structDef); + + AtlasType type = typeRegistry.getType(structDef.getName()); + + if (type.getTypeCategory() != org.apache.atlas.model.TypeCategory.STRUCT) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_MATCH_FAILED, structDef.getName(), TypeCategory.STRUCT.name()); + } + + AtlasVertex ret = typeDefStore.findTypeVertexByName(structDef.getName()); + + if (ret != null) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_ALREADY_EXISTS, structDef.getName()); + } + + ret = typeDefStore.createTypeVertex(structDef); + + AtlasStructDefStoreV2.updateVertexPreCreate(structDef, (AtlasStructType)type, ret, typeDefStore); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasStructDefStoreV1.preCreate({}): {}", structDef, ret); + } + + return ret; + } + + @Override + public AtlasStructDef create(AtlasStructDef structDef, AtlasVertex preCreateResult) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasStructDefStoreV1.create({}, {})", structDef, preCreateResult); + } + + AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_CREATE, structDef), "create struct-def ", structDef.getName()); + + if (CollectionUtils.isEmpty(structDef.getAttributeDefs())) { + throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Missing attributes for structdef"); + } + + AtlasVertex vertex = (preCreateResult == null) ? preCreate(structDef) : preCreateResult; + + AtlasStructDefStoreV2.updateVertexAddReferences(structDef, vertex, typeDefStore); + + AtlasStructDef ret = toStructDef(vertex); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasStructDefStoreV1.create({}, {}): {}", structDef, preCreateResult, ret); + } + + return ret; + } + + @Override + public List<AtlasStructDef> getAll() throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasStructDefStoreV1.getAll()"); + } + + List<AtlasStructDef> ret = new ArrayList<>(); + + Iterator<AtlasVertex> vertices = typeDefStore.findTypeVerticesByCategory(TypeCategory.STRUCT); + while (vertices.hasNext()) { + ret.add(toStructDef(vertices.next())); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasStructDefStoreV1.getAll(): count={}", ret.size()); + } + return ret; + } + + @Override + public AtlasStructDef getByName(String name) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasStructDefStoreV1.getByName({})", name); + } + + AtlasVertex vertex = typeDefStore.findTypeVertexByNameAndCategory(name, TypeCategory.STRUCT); + + if (vertex == null) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name); + } + + vertex.getProperty(Constants.TYPE_CATEGORY_PROPERTY_KEY, String.class); + + AtlasStructDef ret = toStructDef(vertex); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasStructDefStoreV1.getByName({}): {}", name, ret); + } + + return ret; + } + + @Override + public AtlasStructDef getByGuid(String guid) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasStructDefStoreV1.getByGuid({})", guid); + } + + AtlasVertex vertex = typeDefStore.findTypeVertexByGuidAndCategory(guid, TypeCategory.STRUCT); + + if (vertex == null) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid); + } + + AtlasStructDef ret = toStructDef(vertex); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasStructDefStoreV1.getByGuid({}): {}", guid, ret); + } + + return ret; + } + + @Override + public AtlasStructDef update(AtlasStructDef structDef) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasStructDefStoreV1.update({})", structDef); + } + + validateType(structDef); + + AtlasStructDef ret = StringUtils.isNotBlank(structDef.getGuid()) ? updateByGuid(structDef.getGuid(), structDef) + : updateByName(structDef.getName(), structDef); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasStructDefStoreV1.update({}): {}", structDef, ret); + } + + return ret; + } + + @Override + public AtlasStructDef updateByName(String name, AtlasStructDef structDef) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasStructDefStoreV1.updateByName({}, {})", name, structDef); + } + + AtlasStructDef existingDef = typeRegistry.getStructDefByName(name); + + AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_UPDATE, existingDef), "update struct-def ", name); + + validateType(structDef); + + AtlasType type = typeRegistry.getType(structDef.getName()); + + if (type.getTypeCategory() != org.apache.atlas.model.TypeCategory.STRUCT) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_MATCH_FAILED, structDef.getName(), TypeCategory.STRUCT.name()); + } + + AtlasVertex vertex = typeDefStore.findTypeVertexByNameAndCategory(name, TypeCategory.STRUCT); + + if (vertex == null) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name); + } + + AtlasStructDefStoreV2.updateVertexPreUpdate(structDef, (AtlasStructType)type, vertex, typeDefStore); + AtlasStructDefStoreV2.updateVertexAddReferences(structDef, vertex, typeDefStore); + + AtlasStructDef ret = toStructDef(vertex); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasStructDefStoreV1.updateByName({}, {}): {}", name, structDef, ret); + } + + return ret; + } + + @Override + public AtlasStructDef updateByGuid(String guid, AtlasStructDef structDef) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasStructDefStoreV1.updateByGuid({})", guid); + } + + AtlasStructDef existingDef = typeRegistry.getStructDefByGuid(guid); + + AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_UPDATE, existingDef), "update struct-def ", (existingDef != null ? existingDef.getName() : guid)); + + validateType(structDef); + + AtlasType type = typeRegistry.getTypeByGuid(guid); + + if (type.getTypeCategory() != org.apache.atlas.model.TypeCategory.STRUCT) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_MATCH_FAILED, structDef.getName(), TypeCategory.STRUCT.name()); + } + + AtlasVertex vertex = typeDefStore.findTypeVertexByGuidAndCategory(guid, TypeCategory.STRUCT); + + if (vertex == null) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid); + } + + AtlasStructDefStoreV2.updateVertexPreUpdate(structDef, (AtlasStructType)type, vertex, typeDefStore); + AtlasStructDefStoreV2.updateVertexAddReferences(structDef, vertex, typeDefStore); + + AtlasStructDef ret = toStructDef(vertex); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasStructDefStoreV1.updateByGuid({}): {}", guid, ret); + } + + return ret; + } + + @Override + public AtlasVertex preDeleteByName(String name) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasStructDefStoreV1.preDeleteByName({})", name); + } + + AtlasStructDef existingDef = typeRegistry.getStructDefByName(name); + + AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_DELETE, existingDef), "delete struct-def ", name); + + AtlasVertex ret = typeDefStore.findTypeVertexByNameAndCategory(name, TypeCategory.STRUCT); + + if (AtlasGraphUtilsV2.typeHasInstanceVertex(name)) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_HAS_REFERENCES, name); + } + + if (ret == null) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name); + } + + typeDefStore.deleteTypeVertexOutEdges(ret); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasStructDefStoreV1.preDeleteByName({}): {}", name, ret); + } + + return ret; + } + + @Override + public AtlasVertex preDeleteByGuid(String guid) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasStructDefStoreV1.preDeleteByGuid({})", guid); + } + + AtlasStructDef existingDef = typeRegistry.getStructDefByGuid(guid); + + AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_DELETE, existingDef), "delete struct-def ", (existingDef != null ? existingDef.getName() : guid)); + + AtlasVertex ret = typeDefStore.findTypeVertexByGuidAndCategory(guid, TypeCategory.STRUCT); + + String typeName = AtlasGraphUtilsV2.getProperty(ret, Constants.TYPENAME_PROPERTY_KEY, String.class); + + if (AtlasGraphUtilsV2.typeHasInstanceVertex(typeName)) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_HAS_REFERENCES, typeName); + } + + if (ret == null) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid); + } + + typeDefStore.deleteTypeVertexOutEdges(ret); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasStructDefStoreV1.preDeleteByGuid({}): {}", guid, ret); + } + + return ret; + } + + private AtlasStructDef toStructDef(AtlasVertex vertex) throws AtlasBaseException { + AtlasStructDef ret = null; + + if (vertex != null && typeDefStore.isTypeVertex(vertex, TypeCategory.STRUCT)) { + ret = toStructDef(vertex, new AtlasStructDef(), typeDefStore); + } + + return ret; + } + + public static void updateVertexPreCreate(AtlasStructDef structDef, AtlasStructType structType, + AtlasVertex vertex, AtlasTypeDefGraphStoreV2 typeDefStore) throws AtlasBaseException { + List<String> attrNames = new ArrayList<>(structDef.getAttributeDefs().size()); + + for (AtlasAttributeDef attributeDef : structDef.getAttributeDefs()) { + // Validate the mandatory features of an attribute (compatibility with legacy type system) + if (StringUtils.isEmpty(attributeDef.getName())) { + throw new AtlasBaseException(AtlasErrorCode.MISSING_MANDATORY_ATTRIBUTE, structDef.getName(), "name"); + } + if (StringUtils.isEmpty(attributeDef.getTypeName())) { + throw new AtlasBaseException(AtlasErrorCode.MISSING_MANDATORY_ATTRIBUTE, structDef.getName(), "typeName"); + } + + String propertyKey = AtlasGraphUtilsV2.getTypeDefPropertyKey(structDef, attributeDef.getName()); + + AtlasGraphUtilsV2.setProperty(vertex, propertyKey, toJsonFromAttribute(structType.getAttribute(attributeDef.getName()))); + + attrNames.add(attributeDef.getName()); + } + AtlasGraphUtilsV2.setProperty(vertex, AtlasGraphUtilsV2.getTypeDefPropertyKey(structDef), attrNames); + } + + public static void updateVertexPreUpdate(AtlasStructDef structDef, AtlasStructType structType, + AtlasVertex vertex, AtlasTypeDefGraphStoreV2 typeDefStore) + throws AtlasBaseException { + + List<String> attrNames = new ArrayList<>(); + if (CollectionUtils.isNotEmpty(structDef.getAttributeDefs())) { + for (AtlasAttributeDef attributeDef : structDef.getAttributeDefs()) { + attrNames.add(attributeDef.getName()); + } + } + + List<String> currAttrNames = vertex.getProperty(AtlasGraphUtilsV2.getTypeDefPropertyKey(structDef), List.class); + + // delete attributes that are not present in updated structDef + if (CollectionUtils.isNotEmpty(currAttrNames)) { + for (String currAttrName : currAttrNames) { + if (!attrNames.contains(currAttrName)) { + throw new AtlasBaseException(AtlasErrorCode.ATTRIBUTE_DELETION_NOT_SUPPORTED, + structDef.getName(), currAttrName); + } + } + } + + typeDefStore.updateTypeVertex(structDef, vertex); + + // Load up current struct definition for matching attributes + AtlasStructDef currentStructDef = toStructDef(vertex, new AtlasStructDef(), typeDefStore); + + // add/update attributes that are present in updated structDef + if (CollectionUtils.isNotEmpty(structDef.getAttributeDefs())) { + for (AtlasAttributeDef attributeDef : structDef.getAttributeDefs()) { + if (CollectionUtils.isEmpty(currAttrNames) || !currAttrNames.contains(attributeDef.getName())) { + // new attribute - only allow if optional + if (!attributeDef.getIsOptional()) { + throw new AtlasBaseException(AtlasErrorCode.CANNOT_ADD_MANDATORY_ATTRIBUTE, structDef.getName(), attributeDef.getName()); + } + } + + // Validate the mandatory features of an attribute (compatibility with legacy type system) + if (StringUtils.isEmpty(attributeDef.getName())) { + throw new AtlasBaseException(AtlasErrorCode.MISSING_MANDATORY_ATTRIBUTE, structDef.getName(), "name"); + } + if (StringUtils.isEmpty(attributeDef.getTypeName())) { + throw new AtlasBaseException(AtlasErrorCode.MISSING_MANDATORY_ATTRIBUTE, structDef.getName(), "typeName"); + } + + AtlasAttributeDef existingAttribute = currentStructDef.getAttribute(attributeDef.getName()); + if (null != existingAttribute && !attributeDef.getTypeName().equals(existingAttribute.getTypeName())) { + throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Data type update for attribute is not supported"); + } + + String propertyKey = AtlasGraphUtilsV2.getTypeDefPropertyKey(structDef, attributeDef.getName()); + + AtlasGraphUtilsV2.setProperty(vertex, propertyKey, toJsonFromAttribute(structType.getAttribute(attributeDef.getName()))); + } + } + + AtlasGraphUtilsV2.setProperty(vertex, AtlasGraphUtilsV2.getTypeDefPropertyKey(structDef), attrNames); + } + + public static void updateVertexAddReferences(AtlasStructDef structDef, AtlasVertex vertex, + AtlasTypeDefGraphStoreV2 typeDefStore) throws AtlasBaseException { + for (AtlasAttributeDef attributeDef : structDef.getAttributeDefs()) { + addReferencesForAttribute(vertex, attributeDef, typeDefStore); + } + } + + public static AtlasStructDef toStructDef(AtlasVertex vertex, AtlasStructDef structDef, + AtlasTypeDefGraphStoreV2 typeDefStore) throws AtlasBaseException { + AtlasStructDef ret = (structDef != null) ? structDef :new AtlasStructDef(); + + typeDefStore.vertexToTypeDef(vertex, ret); + + List<AtlasAttributeDef> attributeDefs = new ArrayList<>(); + List<String> attrNames = vertex.getProperty(AtlasGraphUtilsV2.getTypeDefPropertyKey(ret), List.class); + + if (CollectionUtils.isNotEmpty(attrNames)) { + for (String attrName : attrNames) { + String propertyKey = AtlasGraphUtilsV2.getTypeDefPropertyKey(ret, attrName); + String attribJson = vertex.getProperty(GraphHelper.encodePropertyKey(propertyKey), String.class); + + attributeDefs.add(toAttributeDefFromJson(structDef, AtlasType.fromJson(attribJson, Map.class), + typeDefStore)); + } + } + ret.setAttributeDefs(attributeDefs); + + return ret; + } + + private static void addReferencesForAttribute(AtlasVertex vertex, AtlasAttributeDef attributeDef, + AtlasTypeDefGraphStoreV2 typeDefStore) throws AtlasBaseException { + Set<String> referencedTypeNames = AtlasTypeUtil.getReferencedTypeNames(attributeDef.getTypeName()); + + String typeName = vertex.getProperty(Constants.TYPENAME_PROPERTY_KEY, String.class); + + for (String referencedTypeName : referencedTypeNames) { + if (!AtlasTypeUtil.isBuiltInType(referencedTypeName)) { + AtlasVertex referencedTypeVertex = typeDefStore.findTypeVertexByName(referencedTypeName); + + if (referencedTypeVertex == null) { + throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_TYPE, referencedTypeName, typeName, attributeDef.getName()); + } + + String label = AtlasGraphUtilsV2.getEdgeLabel(typeName, attributeDef.getName()); + + typeDefStore.getOrCreateEdge(vertex, referencedTypeVertex, label); + } + } + } + + @VisibleForTesting + public static String toJsonFromAttribute(AtlasAttribute attribute) { + AtlasAttributeDef attributeDef = attribute.getAttributeDef(); + Map<String, Object> attribInfo = new HashMap<>(); + + attribInfo.put("name", attributeDef.getName()); + attribInfo.put("dataType", attributeDef.getTypeName()); + attribInfo.put("isUnique", attributeDef.getIsUnique()); + attribInfo.put("isIndexable", attributeDef.getIsIndexable()); + attribInfo.put("includeInNotification", attributeDef.getIncludeInNotification()); + attribInfo.put("isComposite", attribute.isOwnedRef()); + attribInfo.put("reverseAttributeName", attribute.getInverseRefAttributeName()); + attribInfo.put("defaultValue", attributeDef.getDefaultValue()); + attribInfo.put("description", attributeDef.getDescription()); + + final int lower; + final int upper; + + if (attributeDef.getCardinality() == AtlasAttributeDef.Cardinality.SINGLE) { + lower = attributeDef.getIsOptional() ? 0 : 1; + upper = 1; + } else { + if(attributeDef.getIsOptional()) { + lower = 0; + } else { + lower = attributeDef.getValuesMinCount() < 1 ? 1 : attributeDef.getValuesMinCount(); + } + + upper = attributeDef.getValuesMaxCount() < 2 ? Integer.MAX_VALUE : attributeDef.getValuesMaxCount(); + } + + Map<String, Object> multiplicity = new HashMap<>(); + multiplicity.put("lower", lower); + multiplicity.put("upper", upper); + multiplicity.put("isUnique", AtlasAttributeDef.Cardinality.SET.equals(attributeDef.getCardinality())); + + attribInfo.put("multiplicity", AtlasType.toJson(multiplicity)); + + return AtlasType.toJson(attribInfo); + } + + @VisibleForTesting + public static AtlasAttributeDef toAttributeDefFromJson(AtlasStructDef structDef, + Map attribInfo, + AtlasTypeDefGraphStoreV2 typeDefStore) + throws AtlasBaseException { + AtlasAttributeDef ret = new AtlasAttributeDef(); + + ret.setName((String) attribInfo.get("name")); + ret.setTypeName((String) attribInfo.get("dataType")); + ret.setIsUnique((Boolean) attribInfo.get("isUnique")); + ret.setIsIndexable((Boolean) attribInfo.get("isIndexable")); + ret.setIncludeInNotification((Boolean) attribInfo.get("includeInNotification")); + ret.setDefaultValue((String) attribInfo.get("defaultValue")); + ret.setDescription((String) attribInfo.get("description")); + + if ((Boolean)attribInfo.get("isComposite")) { + ret.addConstraint(new AtlasConstraintDef(AtlasConstraintDef.CONSTRAINT_TYPE_OWNED_REF)); + } + + final String reverseAttributeName = (String) attribInfo.get("reverseAttributeName"); + if (StringUtils.isNotBlank(reverseAttributeName)) { + ret.addConstraint(new AtlasConstraintDef(AtlasConstraintDef.CONSTRAINT_TYPE_INVERSE_REF, + new HashMap<String, Object>() {{ + put(AtlasConstraintDef.CONSTRAINT_PARAM_ATTRIBUTE, reverseAttributeName); + }})); + } + + Map multiplicity = AtlasType.fromJson((String) attribInfo.get("multiplicity"), Map.class); + Number minCount = (Number) multiplicity.get("lower"); + Number maxCount = (Number) multiplicity.get("upper"); + Boolean isUnique = (Boolean) multiplicity.get("isUnique"); + + if (minCount == null || minCount.intValue() == 0) { + ret.setIsOptional(true); + ret.setValuesMinCount(0); + } else { + ret.setIsOptional(false); + ret.setValuesMinCount(minCount.intValue()); + } + + if (maxCount == null || maxCount.intValue() < 2) { + ret.setCardinality(AtlasAttributeDef.Cardinality.SINGLE); + ret.setValuesMaxCount(1); + } else { + if (isUnique == null || isUnique == Boolean.FALSE) { + ret.setCardinality(AtlasAttributeDef.Cardinality.LIST); + } else { + ret.setCardinality(AtlasAttributeDef.Cardinality.SET); + } + + ret.setValuesMaxCount(maxCount.intValue()); + } + + return ret; + } + + public static Multiplicity getMultiplicity(AtlasAttributeDef attributeDef) { + final int lower; + final int upper; + final boolean isUnique = AtlasAttributeDef.Cardinality.SET.equals(attributeDef.getCardinality()); + + if (attributeDef.getCardinality() == AtlasAttributeDef.Cardinality.SINGLE) { + lower = attributeDef.getIsOptional() ? 0 : 1; + upper = 1; + } else { + if(attributeDef.getIsOptional()) { + lower = 0; + } else { + lower = attributeDef.getValuesMinCount() < 1 ? 1 : attributeDef.getValuesMinCount(); + } + + upper = attributeDef.getValuesMaxCount() < 2 ? Integer.MAX_VALUE : attributeDef.getValuesMaxCount(); + } + + Multiplicity ret = new Multiplicity(lower, upper, isUnique); + + return ret; + } + + public static AttributeDefinition toAttributeDefinition(AtlasAttribute attribute) { + final AtlasAttributeDef attrDef = attribute.getAttributeDef(); + + AttributeDefinition ret = new AttributeDefinition(); + + ret.setName(attrDef.getName()); + ret.setDataTypeName(attrDef.getTypeName()); + ret.setMultiplicity(getMultiplicity(attrDef)); + ret.setIsComposite(attribute.isOwnedRef()); + ret.setIsUnique(attrDef.getIsUnique()); + ret.setIsIndexable(attrDef.getIsIndexable()); + ret.setReverseAttributeName(attribute.getInverseRefAttributeName()); + ret.setDescription(attrDef.getDescription()); + ret.setDefaultValue(attrDef.getDefaultValue()); + + return ret; + } +}
http://git-wip-us.apache.org/repos/asf/atlas/blob/4823c8ed/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasTypeDefGraphStoreV2.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasTypeDefGraphStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasTypeDefGraphStoreV2.java new file mode 100644 index 0000000..bcdc7a8 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasTypeDefGraphStoreV2.java @@ -0,0 +1,539 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v2; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +import static org.apache.atlas.repository.Constants.TYPE_CATEGORY_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.VERTEX_TYPE_PROPERTY_KEY; +import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.VERTEX_TYPE; + +import java.util.Date; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.RequestContext; +import org.apache.atlas.annotation.GraphTransaction; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.listener.TypeDefChangeListener; +import org.apache.atlas.model.typedef.*; +import org.apache.atlas.repository.Constants; +import org.apache.atlas.repository.graphdb.*; +import org.apache.atlas.repository.store.graph.*; +import org.apache.atlas.type.AtlasType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.typesystem.types.DataTypes.TypeCategory; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import javax.inject.Inject; +import javax.inject.Singleton; + + +/** + * Graph persistence store for TypeDef - v1 + */ +@Singleton +@Component +public class AtlasTypeDefGraphStoreV2 extends AtlasTypeDefGraphStore { + private static final Logger LOG = LoggerFactory.getLogger(AtlasTypeDefGraphStoreV2.class); + + protected final AtlasGraph atlasGraph; + + @Inject + public AtlasTypeDefGraphStoreV2(AtlasTypeRegistry typeRegistry, + Set<TypeDefChangeListener> typeDefChangeListeners, + AtlasGraph atlasGraph) { + super(typeRegistry, typeDefChangeListeners); + this.atlasGraph = atlasGraph; + + LOG.debug("<== AtlasTypeDefGraphStoreV1()"); + } + + @Override + protected AtlasDefStore<AtlasEnumDef> getEnumDefStore(AtlasTypeRegistry typeRegistry) { + return new AtlasEnumDefStoreV2(this, typeRegistry); + } + + @Override + protected AtlasDefStore<AtlasStructDef> getStructDefStore(AtlasTypeRegistry typeRegistry) { + return new AtlasStructDefStoreV2(this, typeRegistry); + } + + @Override + protected AtlasDefStore<AtlasClassificationDef> getClassificationDefStore(AtlasTypeRegistry typeRegistry) { + return new AtlasClassificationDefStoreV2(this, typeRegistry); + } + + @Override + protected AtlasDefStore<AtlasEntityDef> getEntityDefStore(AtlasTypeRegistry typeRegistry) { + return new AtlasEntityDefStoreV2(this, typeRegistry); + } + + @Override + protected AtlasDefStore<AtlasRelationshipDef> getRelationshipDefStore(AtlasTypeRegistry typeRegistry) { + return new AtlasRelationshipDefStoreV2(this, typeRegistry); + } + + + @Override + @GraphTransaction + public void init() throws AtlasBaseException { + LOG.info("==> AtlasTypeDefGraphStoreV1.init()"); + + super.init(); + + LOG.info("<== AtlasTypeDefGraphStoreV1.init()"); + } + + AtlasGraph getAtlasGraph() { return atlasGraph; } + + @VisibleForTesting + public AtlasVertex findTypeVertexByName(String typeName) { + Iterator results = atlasGraph.query().has(VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE) + .has(Constants.TYPENAME_PROPERTY_KEY, typeName) + .vertices().iterator(); + + return (results != null && results.hasNext()) ? (AtlasVertex) results.next() : null; + } + + AtlasVertex findTypeVertexByNameAndCategory(String typeName, TypeCategory category) { + Iterator results = atlasGraph.query().has(VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE) + .has(Constants.TYPENAME_PROPERTY_KEY, typeName) + .has(TYPE_CATEGORY_PROPERTY_KEY, category) + .vertices().iterator(); + + return (results != null && results.hasNext()) ? (AtlasVertex) results.next() : null; + } + + AtlasVertex findTypeVertexByGuid(String typeGuid) { + Iterator<AtlasVertex> vertices = atlasGraph.query().has(VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE) + .has(Constants.GUID_PROPERTY_KEY, typeGuid) + .vertices().iterator(); + + return (vertices != null && vertices.hasNext()) ? vertices.next() : null; + } + + AtlasVertex findTypeVertexByGuidAndCategory(String typeGuid, TypeCategory category) { + Iterator<AtlasVertex> vertices = atlasGraph.query().has(VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE) + .has(Constants.GUID_PROPERTY_KEY, typeGuid) + .has(TYPE_CATEGORY_PROPERTY_KEY, category) + .vertices().iterator(); + + return (vertices != null && vertices.hasNext()) ? vertices.next() : null; + } + + Iterator<AtlasVertex> findTypeVerticesByCategory(TypeCategory category) { + + return (Iterator<AtlasVertex>) atlasGraph.query().has(VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE) + .has(TYPE_CATEGORY_PROPERTY_KEY, category) + .vertices().iterator(); + } + + AtlasVertex createTypeVertex(AtlasBaseTypeDef typeDef) { + // Validate all the required checks + Preconditions.checkArgument(StringUtils.isNotBlank(typeDef.getName()), "Type name can't be null/empty"); + + AtlasVertex ret = atlasGraph.addVertex(); + + if (StringUtils.isBlank(typeDef.getTypeVersion())) { + typeDef.setTypeVersion("1.0"); + } + + if (typeDef.getVersion() == null) { + typeDef.setVersion(1L); + } + + if (StringUtils.isBlank(typeDef.getGuid())) { + typeDef.setGuid(UUID.randomUUID().toString()); + } + + if (typeDef.getCreateTime() == null) { + typeDef.setCreateTime(new Date()); + } + + if (typeDef.getUpdateTime() == null) { + typeDef.setUpdateTime(new Date()); + } + + ret.setProperty(VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE); // Mark as type vertex + ret.setProperty(TYPE_CATEGORY_PROPERTY_KEY, getTypeCategory(typeDef)); + + ret.setProperty(Constants.TYPENAME_PROPERTY_KEY, typeDef.getName()); + ret.setProperty(Constants.TYPEDESCRIPTION_PROPERTY_KEY, + StringUtils.isNotBlank(typeDef.getDescription()) ? typeDef.getDescription() : typeDef.getName()); + ret.setProperty(Constants.TYPEVERSION_PROPERTY_KEY, typeDef.getTypeVersion()); + ret.setProperty(Constants.GUID_PROPERTY_KEY, typeDef.getGuid()); + ret.setProperty(Constants.CREATED_BY_KEY, getCurrentUser()); + ret.setProperty(Constants.TIMESTAMP_PROPERTY_KEY, typeDef.getCreateTime().getTime()); + ret.setProperty(Constants.MODIFIED_BY_KEY, getCurrentUser()); + ret.setProperty(Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, typeDef.getUpdateTime().getTime()); + ret.setProperty(Constants.VERSION_PROPERTY_KEY, typeDef.getVersion()); + ret.setProperty(Constants.TYPEOPTIONS_PROPERTY_KEY, AtlasType.toJson(typeDef.getOptions())); + + return ret; + } + + void updateTypeVertex(AtlasBaseTypeDef typeDef, AtlasVertex vertex) { + if (!isTypeVertex(vertex)) { + LOG.warn("updateTypeVertex(): not a type-vertex - {}", vertex); + + return; + } + + updateVertexProperty(vertex, Constants.GUID_PROPERTY_KEY, typeDef.getGuid()); + /* + * rename of a type is supported yet - as the typename is used to in the name of the edges from this vertex + * To support rename of types, he edge names should be derived from an internal-name - not directly the typename + * + updateVertexProperty(vertex, Constants.TYPENAME_PROPERTY_KEY, typeDef.getName()); + */ + updateVertexProperty(vertex, Constants.TYPEDESCRIPTION_PROPERTY_KEY, typeDef.getDescription()); + updateVertexProperty(vertex, Constants.TYPEVERSION_PROPERTY_KEY, typeDef.getTypeVersion()); + updateVertexProperty(vertex, Constants.TYPEOPTIONS_PROPERTY_KEY, AtlasType.toJson(typeDef.getOptions())); + + markVertexUpdated(vertex); + } + + void deleteTypeVertexOutEdges(AtlasVertex vertex) throws AtlasBaseException { + Iterable<AtlasEdge> edges = vertex.getEdges(AtlasEdgeDirection.OUT); + + for (AtlasEdge edge : edges) { + atlasGraph.removeEdge(edge); + } + } + + /** + * Look to see if there are any IN edges with the supplied label + * @param vertex + * @param label + * @return + * @throws AtlasBaseException + */ + boolean hasIncomingEdgesWithLabel(AtlasVertex vertex, String label) throws AtlasBaseException { + boolean foundEdges = false; + Iterator<AtlasEdge> inEdges = vertex.getEdges(AtlasEdgeDirection.IN).iterator(); + + while (inEdges.hasNext()) { + AtlasEdge edge = inEdges.next(); + + if (label.equals(edge.getLabel())) { + foundEdges = true; + break; + } + } + return foundEdges; + } + + void deleteTypeVertex(AtlasVertex vertex) throws AtlasBaseException { + Iterator<AtlasEdge> inEdges = vertex.getEdges(AtlasEdgeDirection.IN).iterator(); + + if (inEdges.hasNext()) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_HAS_REFERENCES); + } + + Iterable<AtlasEdge> edges = vertex.getEdges(AtlasEdgeDirection.OUT); + + for (AtlasEdge edge : edges) { + atlasGraph.removeEdge(edge); + } + atlasGraph.removeVertex(vertex); + } + + void vertexToTypeDef(AtlasVertex vertex, AtlasBaseTypeDef typeDef) { + String name = vertex.getProperty(Constants.TYPENAME_PROPERTY_KEY, String.class); + String description = vertex.getProperty(Constants.TYPEDESCRIPTION_PROPERTY_KEY, String.class); + String typeVersion = vertex.getProperty(Constants.TYPEVERSION_PROPERTY_KEY, String.class); + String guid = vertex.getProperty(Constants.GUID_PROPERTY_KEY, String.class); + String createdBy = vertex.getProperty(Constants.CREATED_BY_KEY, String.class); + String updatedBy = vertex.getProperty(Constants.MODIFIED_BY_KEY, String.class); + Long createTime = vertex.getProperty(Constants.TIMESTAMP_PROPERTY_KEY, Long.class); + Long updateTime = vertex.getProperty(Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, Long.class); + Object versionObj = vertex.getProperty(Constants.VERSION_PROPERTY_KEY, Object.class); + String options = vertex.getProperty(Constants.TYPEOPTIONS_PROPERTY_KEY, String.class); + + Long version = null; + + if(versionObj instanceof Number) { + version = ((Number)versionObj).longValue(); + } else if (versionObj != null) { + version = Long.valueOf(versionObj.toString()); + } else { + version = Long.valueOf(0); + } + + + typeDef.setName(name); + typeDef.setDescription(description); + typeDef.setTypeVersion(typeVersion); + typeDef.setGuid(guid); + typeDef.setCreatedBy(createdBy); + typeDef.setUpdatedBy(updatedBy); + + if (createTime != null) { + typeDef.setCreateTime(new Date(createTime)); + } + + if (updateTime != null) { + typeDef.setUpdateTime(new Date(updateTime)); + } + + if (version != null) { + typeDef.setVersion(version); + } + + if (options != null) { + typeDef.setOptions(AtlasType.fromJson(options, Map.class)); + } + } + + boolean isTypeVertex(AtlasVertex vertex) { + String vertexType = vertex.getProperty(Constants.VERTEX_TYPE_PROPERTY_KEY, String.class); + + return VERTEX_TYPE.equals(vertexType); + } + + @VisibleForTesting + public boolean isTypeVertex(AtlasVertex vertex, TypeCategory category) { + boolean ret = false; + + if (isTypeVertex(vertex)) { + Object objTypeCategory = vertex.getProperty(Constants.TYPE_CATEGORY_PROPERTY_KEY, Object.class); + + TypeCategory vertexCategory = null; + + if(objTypeCategory instanceof TypeCategory) { + vertexCategory = (TypeCategory) objTypeCategory; + } else if (objTypeCategory != null) { + vertexCategory = TypeCategory.valueOf(objTypeCategory.toString()); + } + + ret = category.equals(vertexCategory); + } + + return ret; + } + + boolean isTypeVertex(AtlasVertex vertex, TypeCategory[] categories) { + boolean ret = false; + + if (isTypeVertex(vertex)) { + TypeCategory vertexCategory = vertex.getProperty(TYPE_CATEGORY_PROPERTY_KEY, TypeCategory.class); + + for (TypeCategory category : categories) { + if (category.equals(vertexCategory)) { + ret = true; + + break; + } + } + } + + return ret; + } + + AtlasEdge getOrCreateEdge(AtlasVertex outVertex, AtlasVertex inVertex, String edgeLabel) { + AtlasEdge ret = null; + Iterable<AtlasEdge> edges = outVertex.getEdges(AtlasEdgeDirection.OUT, edgeLabel); + + for (AtlasEdge edge : edges) { + if (edge.getInVertex().getId().equals(inVertex.getId())) { + ret = edge; + break; + } + } + + if (ret == null) { + ret = addEdge(outVertex, inVertex, edgeLabel); + } + + return ret; + } + + AtlasEdge addEdge(AtlasVertex outVertex, AtlasVertex inVertex, String edgeLabel) { + + return atlasGraph.addEdge(outVertex, inVertex, edgeLabel); + } + + void removeEdge(AtlasVertex outVertex, AtlasVertex inVertex, String edgeLabel) { + Iterable<AtlasEdge> edges = outVertex.getEdges(AtlasEdgeDirection.OUT, edgeLabel); + + for (AtlasEdge edge : edges) { + if (edge.getInVertex().getId().equals(inVertex.getId())) { + atlasGraph.removeEdge(edge); + } + } + } + + void createSuperTypeEdges(AtlasVertex vertex, Set<String> superTypes, TypeCategory typeCategory) + throws AtlasBaseException { + Set<String> currentSuperTypes = getSuperTypeNames(vertex); + + if (CollectionUtils.isNotEmpty(superTypes)) { + if (! superTypes.containsAll(currentSuperTypes)) { + throw new AtlasBaseException(AtlasErrorCode.SUPERTYPE_REMOVAL_NOT_SUPPORTED); + } + + for (String superType : superTypes) { + AtlasVertex superTypeVertex = findTypeVertexByNameAndCategory(superType, typeCategory); + + getOrCreateEdge(vertex, superTypeVertex, AtlasGraphUtilsV2.SUPERTYPE_EDGE_LABEL); + } + } else if (CollectionUtils.isNotEmpty(currentSuperTypes)) { + throw new AtlasBaseException(AtlasErrorCode.SUPERTYPE_REMOVAL_NOT_SUPPORTED); + } + } + + public void createEntityTypeEdges(AtlasVertex classificationVertex, Set<String> entityTypes) throws AtlasBaseException { + Set<String> currentEntityTypes = getEntityTypeNames(classificationVertex); + String classificationTypeName = classificationVertex.getProperty(Constants.TYPENAME_PROPERTY_KEY, String.class); + + if (CollectionUtils.isNotEmpty(entityTypes)) { + if (!entityTypes.containsAll(currentEntityTypes)) { + throw new AtlasBaseException(AtlasErrorCode.ENTITYTYPE_REMOVAL_NOT_SUPPORTED, classificationTypeName); + } + + for (String entityType : entityTypes) { + AtlasVertex entityTypeVertex = findTypeVertexByNameAndCategory(entityType, TypeCategory.CLASS); + if (entityTypeVertex == null) { + throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATIONDEF_INVALID_ENTITYTYPES, classificationTypeName,entityType); + + } + getOrCreateEdge(classificationVertex, entityTypeVertex, AtlasGraphUtilsV2.ENTITYTYPE_EDGE_LABEL); + } + } else if (CollectionUtils.isNotEmpty(currentEntityTypes)) { // remove the restrictions, if present + for (String entityType : currentEntityTypes) { + AtlasVertex entityTypeVertex = findTypeVertexByNameAndCategory(entityType, TypeCategory.CLASS); + + if (entityTypeVertex == null) { + throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATIONDEF_INVALID_ENTITYTYPES, classificationTypeName,entityType); + + } + + removeEdge(classificationVertex, entityTypeVertex, AtlasGraphUtilsV2.ENTITYTYPE_EDGE_LABEL); + } + + } + } + + Set<String> getSuperTypeNames(AtlasVertex vertex) { + return getTypeNamesFromEdges(vertex, AtlasGraphUtilsV2.SUPERTYPE_EDGE_LABEL); + } + + Set<String> getEntityTypeNames(AtlasVertex vertex) { + return getTypeNamesFromEdges(vertex, AtlasGraphUtilsV2.ENTITYTYPE_EDGE_LABEL); + } + + /** + * Get the typename properties from the edges, that are associated with the vertex and have the supplied edge label. + * @param vertex + * @param edgeLabel + * @return set of type names + */ + private Set<String> getTypeNamesFromEdges(AtlasVertex vertex,String edgeLabel) { + Set<String> ret = new HashSet<>(); + Iterable<AtlasEdge> edges = vertex.getEdges(AtlasEdgeDirection.OUT, edgeLabel); + + for (AtlasEdge edge : edges) { + ret.add(edge.getInVertex().getProperty(Constants.TYPENAME_PROPERTY_KEY, String.class)); + } + + return ret; + } + + TypeCategory getTypeCategory(AtlasBaseTypeDef typeDef) { + switch (typeDef.getCategory()) { + case ENTITY: + return TypeCategory.CLASS; + + case CLASSIFICATION: + return TypeCategory.TRAIT; + + case STRUCT: + return TypeCategory.STRUCT; + + case ENUM: + return TypeCategory.ENUM; + + case RELATIONSHIP: + return TypeCategory.RELATIONSHIP; + } + + return null; + } + + /* + * update the given vertex property, if the new value is not-blank + */ + private void updateVertexProperty(AtlasVertex vertex, String propertyName, String newValue) { + if (StringUtils.isNotBlank(newValue)) { + String currValue = vertex.getProperty(propertyName, String.class); + + if (!StringUtils.equals(currValue, newValue)) { + vertex.setProperty(propertyName, newValue); + } + } + } + + /* + * update the given vertex property, if the new value is not-null + */ + private void updateVertexProperty(AtlasVertex vertex, String propertyName, Date newValue) { + if (newValue != null) { + Number currValue = vertex.getProperty(propertyName, Number.class); + + if (currValue == null || !currValue.equals(newValue.getTime())) { + vertex.setProperty(propertyName, newValue.getTime()); + } + } + } + + /* + * increment the version value for this vertex + */ + private void markVertexUpdated(AtlasVertex vertex) { + Date now = new Date(); + Number currVersion = vertex.getProperty(Constants.VERSION_PROPERTY_KEY, Number.class); + long newVersion = currVersion == null ? 1 : (currVersion.longValue() + 1); + + vertex.setProperty(Constants.MODIFIED_BY_KEY, getCurrentUser()); + vertex.setProperty(Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, now.getTime()); + vertex.setProperty(Constants.VERSION_PROPERTY_KEY, newVersion); + } + + private String getCurrentUser() { + String ret = RequestContext.get().getUser(); + + if (StringUtils.isBlank(ret)) { + ret = System.getProperty("user.name"); + + if (StringUtils.isBlank(ret)) { + ret = "atlas"; + } + } + + return ret; + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/4823c8ed/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AttributeMutationContext.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AttributeMutationContext.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AttributeMutationContext.java new file mode 100644 index 0000000..442b9ec --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AttributeMutationContext.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v2; + + +import org.apache.atlas.model.instance.EntityMutations.EntityOperation; +import org.apache.atlas.model.typedef.AtlasStructDef; +import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; +import org.apache.atlas.repository.graphdb.AtlasEdge; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.type.AtlasStructType; +import org.apache.atlas.type.AtlasStructType.AtlasAttribute; +import org.apache.atlas.type.AtlasType; + + +import java.util.Objects; + +public class AttributeMutationContext { + private EntityOperation op; + /** + * Atlas Attribute + */ + + private AtlasAttribute attribute; + + /** + * Overriding type for which elements are being mapped + */ + private AtlasType currentElementType; + + /** + * Current attribute value/entity/Struct instance + */ + private Object value; + + private String vertexProperty; + + /** + * + * The vertex which corresponds to the entity/struct for which we are mapping a complex attributes like struct, traits + */ + AtlasVertex referringVertex; + + /** + * The current edge(in case of updates) from the parent entity/struct to the complex attribute like struct, trait + */ + AtlasEdge existingEdge; + + public AttributeMutationContext(EntityOperation op, AtlasVertex referringVertex, AtlasAttribute attribute, Object value) { + this(op, referringVertex, attribute, value, attribute.getVertexPropertyName(), null, null); + } + + public AttributeMutationContext(EntityOperation op, AtlasVertex referringVertex, AtlasAttribute attribute, Object value, + String vertexProperty, AtlasType currentElementType, AtlasEdge currentEdge) { + this.op = op; + this.referringVertex = referringVertex; + this.attribute = attribute; + this.value = value; + this.vertexProperty = vertexProperty; + this.currentElementType = currentElementType; + this.existingEdge = currentEdge; + } + + @Override + public int hashCode() { + return Objects.hash(op, referringVertex, attribute, value, vertexProperty, currentElementType, existingEdge); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } else if (obj == this) { + return true; + } else if (obj.getClass() != getClass()) { + return false; + } else { + AttributeMutationContext rhs = (AttributeMutationContext) obj; + return Objects.equals(op, rhs.op) + && Objects.equals(referringVertex, rhs.referringVertex) + && Objects.equals(attribute, rhs.attribute) + && Objects.equals(value, rhs.value) + && Objects.equals(vertexProperty, rhs.vertexProperty) + && Objects.equals(currentElementType, rhs.currentElementType) + && Objects.equals(existingEdge, rhs.existingEdge); + } + } + + + public AtlasStructType getParentType() { + return attribute.getDefinedInType(); + } + + public AtlasStructDef getStructDef() { + return attribute.getDefinedInDef(); + } + + public AtlasAttributeDef getAttributeDef() { + return attribute.getAttributeDef(); + } + + public AtlasType getAttrType() { + return currentElementType == null ? attribute.getAttributeType() : currentElementType; + } + + public AtlasType getCurrentElementType() { + return currentElementType; + } + + public Object getValue() { + return value; + } + + public String getVertexProperty() { return vertexProperty; } + + public AtlasVertex getReferringVertex() { return referringVertex; } + + public AtlasEdge getCurrentEdge() { + return existingEdge; + } + + public void setElementType(final AtlasType attrType) { + this.currentElementType = attrType; + } + + public AtlasAttribute getAttribute() { + return attribute; + } + + public EntityOperation getOp() { + return op; + } + + public void setExistingEdge(AtlasEdge existingEdge) { this.existingEdge = existingEdge; } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/4823c8ed/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java new file mode 100644 index 0000000..12a40a1 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v2; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.RequestContext; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.impexp.AtlasImportResult; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.model.instance.EntityMutationResponse; +import org.apache.atlas.repository.store.graph.AtlasEntityStore; +import org.apache.atlas.repository.store.graph.BulkImporter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import javax.inject.Inject; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +@Component +public class BulkImporterImpl implements BulkImporter { + private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV2.class); + + private final AtlasEntityStore entityStore; + + @Inject + public BulkImporterImpl(AtlasEntityStore entityStore) { + this.entityStore = entityStore; + } + + @Override + public EntityMutationResponse bulkImport(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> bulkImport()"); + } + + if (entityStream == null || !entityStream.hasNext()) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update."); + } + + EntityMutationResponse ret = new EntityMutationResponse(); + ret.setGuidAssignments(new HashMap<String, String>()); + + Set<String> processedGuids = new HashSet<>(); + float currentPercent = 0f; + List<String> residualList = new ArrayList<>(); + + EntityImportStreamWithResidualList entityImportStreamWithResidualList = new EntityImportStreamWithResidualList(entityStream, residualList); + + while (entityImportStreamWithResidualList.hasNext()) { + AtlasEntityWithExtInfo entityWithExtInfo = entityImportStreamWithResidualList.getNextEntityWithExtInfo(); + AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null; + + if (entity == null) { + continue; + } + + AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, entityStream); + + try { + EntityMutationResponse resp = entityStore.createOrUpdateForImport(oneEntityStream); + + if (resp.getGuidAssignments() != null) { + ret.getGuidAssignments().putAll(resp.getGuidAssignments()); + } + + currentPercent = updateImportMetrics(entityWithExtInfo, resp, importResult, processedGuids, entityStream.getPosition(), entityImportStreamWithResidualList.getStreamSize(), currentPercent); + + entityStream.onImportComplete(entity.getGuid()); + } catch (AtlasBaseException e) { + if (!updateResidualList(e, residualList, entityWithExtInfo.getEntity().getGuid())) { + throw e; + } + } catch (Throwable e) { + AtlasBaseException abe = new AtlasBaseException(e); + + if (!updateResidualList(abe, residualList, entityWithExtInfo.getEntity().getGuid())) { + throw abe; + } + } finally { + RequestContext.clear(); + } + } + + importResult.getProcessedEntities().addAll(processedGuids); + LOG.info("bulkImport(): done. Total number of entities (including referred entities) imported: {}", processedGuids.size()); + + return ret; + } + + + private boolean updateResidualList(AtlasBaseException e, List<String> lineageList, String guid) { + if (!e.getAtlasErrorCode().getErrorCode().equals(AtlasErrorCode.INVALID_OBJECT_ID.getErrorCode())) { + return false; + } + + lineageList.add(guid); + + return true; + } + + private float updateImportMetrics(AtlasEntity.AtlasEntityWithExtInfo currentEntity, + EntityMutationResponse resp, + AtlasImportResult importResult, + Set<String> processedGuids, + int currentIndex, int streamSize, float currentPercent) { + updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult); + updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult); + updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult); + + String lastEntityImported = String.format("entity:last-imported:%s:[%s]:(%s)", currentEntity.getEntity().getTypeName(), currentIndex, currentEntity.getEntity().getGuid()); + + return updateImportProgress(LOG, currentIndex + 1, streamSize, currentPercent, lastEntityImported); + } + + @VisibleForTesting + static float updateImportProgress(Logger log, int currentIndex, int streamSize, float currentPercent, String additionalInfo) { + final double tolerance = 0.000001; + final int MAX_PERCENT = 100; + + float percent = (float) ((currentIndex * MAX_PERCENT) / streamSize); + boolean updateLog = Double.compare(percent, currentPercent) > tolerance; + float updatedPercent = (MAX_PERCENT < streamSize) ? percent : ((updateLog) ? ++currentPercent : currentPercent); + + if (updateLog) { + log.info("bulkImport(): progress: {}% (of {}) - {}", (int) Math.ceil(percent), streamSize, additionalInfo); + } + + return updatedPercent; + } + + private static void updateImportMetrics(String prefix, List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult importResult) { + if (list == null) { + return; + } + + for (AtlasEntityHeader h : list) { + if (processedGuids.contains(h.getGuid())) { + continue; + } + + processedGuids.add(h.getGuid()); + importResult.incrementMeticsCounter(String.format(prefix, h.getTypeName())); + } + } + + private static class EntityImportStreamWithResidualList { + private final EntityImportStream stream; + private final List<String> residualList; + private boolean navigateResidualList; + private int currentResidualListIndex; + + + public EntityImportStreamWithResidualList(EntityImportStream stream, List<String> residualList) { + this.stream = stream; + this.residualList = residualList; + this.navigateResidualList = false; + this.currentResidualListIndex = 0; + } + + public AtlasEntity.AtlasEntityWithExtInfo getNextEntityWithExtInfo() { + if (navigateResidualList == false) { + return stream.getNextEntityWithExtInfo(); + } else { + stream.setPositionUsingEntityGuid(residualList.get(currentResidualListIndex++)); + return stream.getNextEntityWithExtInfo(); + } + } + + public boolean hasNext() { + if (!navigateResidualList) { + boolean streamHasNext = stream.hasNext(); + navigateResidualList = (streamHasNext == false); + return streamHasNext ? streamHasNext : (currentResidualListIndex < residualList.size()); + } else { + return (currentResidualListIndex < residualList.size()); + } + } + + public int getStreamSize() { + return stream.size() + residualList.size(); + } + } +}