http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d2d6ff7d/repository/src/main/java/org/apache/atlas/repository/graph/IAtlasGraphProvider.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/IAtlasGraphProvider.java b/repository/src/main/java/org/apache/atlas/repository/graph/IAtlasGraphProvider.java new file mode 100755 index 0000000..a2cac2d --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/graph/IAtlasGraphProvider.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.graph; + +import org.apache.atlas.repository.RepositoryException; +import org.apache.atlas.repository.graphdb.AtlasGraph; + +/** + * Provides a mechanism to control what graph is used in various places. This + * allows the graph to be mocked out during unit testing and be initialized + * lazily. + */ +public interface IAtlasGraphProvider { + + AtlasGraph get() throws RepositoryException; +}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d2d6ff7d/repository/src/main/java/org/apache/atlas/repository/graph/SoftDeleteHandler.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/SoftDeleteHandler.java b/repository/src/main/java/org/apache/atlas/repository/graph/SoftDeleteHandler.java index 25aa7c5..92e43cb 100644 --- a/repository/src/main/java/org/apache/atlas/repository/graph/SoftDeleteHandler.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/SoftDeleteHandler.java @@ -19,8 +19,8 @@ package org.apache.atlas.repository.graph; import com.google.inject.Inject; -import com.tinkerpop.blueprints.Edge; -import com.tinkerpop.blueprints.Vertex; +import org.apache.atlas.repository.graphdb.AtlasEdge; +import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.AtlasException; import org.apache.atlas.RequestContext; import org.apache.atlas.typesystem.persistence.Id; @@ -36,7 +36,7 @@ public class SoftDeleteHandler extends DeleteHandler { } @Override - protected void _deleteVertex(Vertex instanceVertex, boolean force) { + protected void _deleteVertex(AtlasVertex instanceVertex, boolean force) { if (force) { graphHelper.removeVertex(instanceVertex); } else { @@ -50,7 +50,7 @@ public class SoftDeleteHandler extends DeleteHandler { } @Override - protected void deleteEdge(Edge edge, boolean force) throws AtlasException { + protected void deleteEdge(AtlasEdge edge, boolean force) throws AtlasException { if (force) { graphHelper.removeEdge(edge); } else { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d2d6ff7d/repository/src/main/java/org/apache/atlas/repository/graph/TitanGraphProvider.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/TitanGraphProvider.java b/repository/src/main/java/org/apache/atlas/repository/graph/TitanGraphProvider.java deleted file mode 100755 index 7a5e6a9..0000000 --- a/repository/src/main/java/org/apache/atlas/repository/graph/TitanGraphProvider.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.atlas.repository.graph; - -import org.apache.atlas.AtlasException; -import org.apache.atlas.repository.graphdb.titan0.Titan0Database; -import org.apache.commons.configuration.Configuration; - -import com.thinkaurelius.titan.core.TitanGraph; - -/** - * Temporary TitanGraphProvider to use until the graph database abstraction - * layer is fully in place. Delegates to the Titan 0.5.4 implementation. This - * will be removed once the abstraction layer is being used. - */ -public class TitanGraphProvider implements GraphProvider<TitanGraph> { - - /* (non-Javadoc) - * @see org.apache.atlas.repository.graph.GraphProvider#get() - */ - @Override - public TitanGraph get() { - return Titan0Database.getGraphInstance(); - } - - public static TitanGraph getGraphInstance() { - return Titan0Database.getGraphInstance(); - } - - public static Configuration getConfiguration() throws AtlasException { - return Titan0Database.getConfiguration(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d2d6ff7d/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java index 2e0414e..47ae5e1 100644 --- a/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java @@ -17,15 +17,26 @@ */ package org.apache.atlas.repository.graph; -import com.google.inject.Inject; -import com.thinkaurelius.titan.core.SchemaViolationException; -import com.tinkerpop.blueprints.Direction; -import com.tinkerpop.blueprints.Edge; -import com.tinkerpop.blueprints.Vertex; +import static org.apache.atlas.repository.graph.GraphHelper.string; + +import java.security.MessageDigest; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + import org.apache.atlas.AtlasException; import org.apache.atlas.RequestContext; import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.RepositoryException; +import org.apache.atlas.repository.graphdb.AtlasEdge; +import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException; +import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.typesystem.IReferenceableInstance; import org.apache.atlas.typesystem.ITypedInstance; import org.apache.atlas.typesystem.ITypedReferenceableInstance; @@ -48,23 +59,12 @@ import org.apache.atlas.utils.MD5Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.security.MessageDigest; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import static org.apache.atlas.repository.graph.GraphHelper.string; +import com.google.inject.Inject; public final class TypedInstanceToGraphMapper { private static final Logger LOG = LoggerFactory.getLogger(TypedInstanceToGraphMapper.class); - private final Map<Id, Vertex> idToVertexMap = new HashMap<>(); + private final Map<Id, AtlasVertex> idToVertexMap = new HashMap<>(); private final TypeSystem typeSystem = TypeSystem.getInstance(); private static final GraphHelper graphHelper = GraphHelper.getInstance(); @@ -143,7 +143,7 @@ public final class TypedInstanceToGraphMapper { //new vertex, set all the properties String guid = addOrUpdateAttributesAndTraits(operation, instance); guids.add(guid); - } catch (SchemaViolationException e) { + } catch (AtlasSchemaViolationException e) { throw new EntityExistsException(instance, e); } } @@ -159,7 +159,7 @@ public final class TypedInstanceToGraphMapper { throw new RepositoryException("id cannot be null"); } - Vertex instanceVertex = idToVertexMap.get(id); + AtlasVertex instanceVertex = idToVertexMap.get(id); // add the attributes for the instance ClassType classType = typeSystem.getDataType(ClassType.class, typedInstance.getTypeName()); @@ -174,7 +174,7 @@ public final class TypedInstanceToGraphMapper { return getId(typedInstance)._getId(); } - void mapInstanceToVertex(ITypedInstance typedInstance, Vertex instanceVertex, + void mapInstanceToVertex(ITypedInstance typedInstance, AtlasVertex instanceVertex, Map<String, AttributeInfo> fields, boolean mapOnlyUniqueAttributes, Operation operation) throws AtlasException { @@ -189,7 +189,7 @@ public final class TypedInstanceToGraphMapper { RequestContext.get().getRequestTime()); } - void mapAttributeToVertex(ITypedInstance typedInstance, Vertex instanceVertex, + void mapAttributeToVertex(ITypedInstance typedInstance, AtlasVertex instanceVertex, AttributeInfo attributeInfo, Operation operation) throws AtlasException { Object attrValue = typedInstance.get(attributeInfo.name); LOG.debug("Mapping attribute {} = {}", attributeInfo.name, attrValue); @@ -213,11 +213,11 @@ public final class TypedInstanceToGraphMapper { case CLASS: String edgeLabel = graphHelper.getEdgeLabel(typedInstance, attributeInfo); - Edge currentEdge = graphHelper.getEdgeForLabel(instanceVertex, edgeLabel); - String newEdgeId = addOrUpdateReference(instanceVertex, attributeInfo, attributeInfo.dataType(), + AtlasEdge currentEdge = graphHelper.getEdgeForLabel(instanceVertex, edgeLabel); + AtlasEdge newEdge = addOrUpdateReference(instanceVertex, attributeInfo, attributeInfo.dataType(), attrValue, currentEdge, edgeLabel, operation); - if (currentEdge != null && !currentEdge.getId().toString().equals(newEdgeId)) { + if (currentEdge != null && !currentEdge.equals(newEdge)) { deleteHandler.deleteEdgeReference(currentEdge, attributeInfo.dataType().getTypeCategory(), attributeInfo.isComposite, true); } @@ -245,7 +245,7 @@ public final class TypedInstanceToGraphMapper { Id id = instance.getId(); if (!idToVertexMap.containsKey(id)) { - Vertex instanceVertex; + AtlasVertex instanceVertex; if (id.isAssigned()) { // has a GUID LOG.debug("Instance has an assigned id {}", instance.getId()._getId()); instanceVertex = graphHelper.getVertexForGUID(id.id); @@ -291,13 +291,13 @@ public final class TypedInstanceToGraphMapper { private void addFullTextProperty(List<ITypedReferenceableInstance> instances, FullTextMapper fulltextMapper) throws AtlasException { for (ITypedReferenceableInstance typedInstance : instances) { // Traverse - Vertex instanceVertex = getClassVertex(typedInstance); + AtlasVertex instanceVertex = getClassVertex(typedInstance); String fullText = fulltextMapper.mapRecursive(instanceVertex, true); GraphHelper.setProperty(instanceVertex, Constants.ENTITY_TEXT_PROPERTY_KEY, fullText); } } - private void addTraits(ITypedReferenceableInstance typedInstance, Vertex instanceVertex, ClassType classType) + private void addTraits(ITypedReferenceableInstance typedInstance, AtlasVertex instanceVertex, ClassType classType) throws AtlasException { for (String traitName : typedInstance.getTraits()) { LOG.debug("mapping trait {}", traitName); @@ -311,7 +311,7 @@ public final class TypedInstanceToGraphMapper { /******************************************** ARRAY **************************************************/ - private void mapArrayCollectionToVertex(ITypedInstance typedInstance, Vertex instanceVertex, + private void mapArrayCollectionToVertex(ITypedInstance typedInstance, AtlasVertex instanceVertex, AttributeInfo attributeInfo, Operation operation) throws AtlasException { LOG.debug("Mapping instance {} for array attribute {} vertex {}", typedInstance.toShortString(), attributeInfo.name, string(instanceVertex)); @@ -323,16 +323,18 @@ public final class TypedInstanceToGraphMapper { return; } - String propertyName = GraphHelper.getQualifiedFieldName(typedInstance, attributeInfo); - List<String> currentElements = GraphHelper.getProperty(instanceVertex, propertyName); IDataType elementType = ((DataTypes.ArrayType) attributeInfo.dataType()).getElemType(); + String propertyName = GraphHelper.getQualifiedFieldName(typedInstance, attributeInfo); + + List<Object> currentElements = GraphHelper.getArrayElementsProperty(elementType, instanceVertex, propertyName); + List<Object> newElementsCreated = new ArrayList<>(); if (!newAttributeEmpty) { if (newElements != null && !newElements.isEmpty()) { int index = 0; for (; index < newElements.size(); index++) { - String currentElement = (currentElements != null && index < currentElements.size()) ? + Object currentElement = (currentElements != null && index < currentElements.size()) ? currentElements.get(index) : null; LOG.debug("Adding/updating element at position {}, current element {}, new element {}", index, currentElement, newElements.get(index)); @@ -343,18 +345,21 @@ public final class TypedInstanceToGraphMapper { } } - List<String> additionalEdges = removeUnusedEntries(instanceVertex, propertyName, currentElements, - newElementsCreated, elementType, attributeInfo); - newElementsCreated.addAll(additionalEdges); + if(GraphHelper.isReference(elementType)) { + + List<AtlasEdge> additionalEdges = removeUnusedEntries(instanceVertex, propertyName, (List)currentElements, + (List)newElementsCreated, elementType, attributeInfo); + newElementsCreated.addAll(additionalEdges); + } // for dereference on way out - GraphHelper.setProperty(instanceVertex, propertyName, newElementsCreated); + GraphHelper.setArrayElementsProperty(elementType, instanceVertex, propertyName, newElementsCreated); } //Removes unused edges from the old collection, compared to the new collection - private List<String> removeUnusedEntries(Vertex instanceVertex, String edgeLabel, - Collection<String> currentEntries, - Collection<Object> newEntries, + private List<AtlasEdge> removeUnusedEntries(AtlasVertex instanceVertex, String edgeLabel, + Collection<AtlasEdge> currentEntries, + Collection<AtlasEdge> newEntries, IDataType entryType, AttributeInfo attributeInfo) throws AtlasException { if (currentEntries != null && !currentEntries.isEmpty()) { LOG.debug("Removing unused entries from the old collection"); @@ -362,20 +367,17 @@ public final class TypedInstanceToGraphMapper { || entryType.getTypeCategory() == DataTypes.TypeCategory.CLASS) { //Remove the edges for (current edges - new edges) - List<String> cloneElements = new ArrayList<>(currentEntries); + List<AtlasEdge> cloneElements = new ArrayList<>(currentEntries); cloneElements.removeAll(newEntries); - List<String> additionalElements = new ArrayList<>(); + List<AtlasEdge> additionalElements = new ArrayList<>(); LOG.debug("Removing unused entries from the old collection - {}", cloneElements); if (!cloneElements.isEmpty()) { - for (String edgeIdForDelete : cloneElements) { - Edge edge = graphHelper.getEdgeByEdgeId(instanceVertex, edgeLabel, edgeIdForDelete); - if(edge != null) { - boolean deleted = deleteHandler.deleteEdgeReference(edge, entryType.getTypeCategory(), - attributeInfo.isComposite, true); - if (!deleted) { - additionalElements.add(edgeIdForDelete); - } + for (AtlasEdge edge : cloneElements) { + boolean deleted = deleteHandler.deleteEdgeReference(edge, entryType.getTypeCategory(), + attributeInfo.isComposite, true); + if (!deleted) { + additionalElements.add(edge); } } } @@ -387,7 +389,7 @@ public final class TypedInstanceToGraphMapper { /******************************************** MAP **************************************************/ - private void mapMapCollectionToVertex(ITypedInstance typedInstance, Vertex instanceVertex, + private void mapMapCollectionToVertex(ITypedInstance typedInstance, AtlasVertex instanceVertex, AttributeInfo attributeInfo, Operation operation) throws AtlasException { LOG.debug("Mapping instance {} to vertex {} for attribute {}", typedInstance.toShortString(), string(instanceVertex), attributeInfo.name); @@ -402,20 +404,20 @@ public final class TypedInstanceToGraphMapper { IDataType elementType = ((DataTypes.MapType) attributeInfo.dataType()).getValueType(); String propertyName = GraphHelper.getQualifiedFieldName(typedInstance, attributeInfo); - Map<String, String> currentMap = new HashMap<>(); + Map<String, Object> currentMap = new HashMap<>(); Map<String, Object> newMap = new HashMap<>(); - List<String> currentKeys = GraphHelper.getProperty(instanceVertex, propertyName); + List<String> currentKeys = GraphHelper.getListProperty(instanceVertex, propertyName); if (currentKeys != null && !currentKeys.isEmpty()) { for (String key : currentKeys) { String propertyNameForKey = GraphHelper.getQualifiedNameForMapKey(propertyName, key); - String propertyValueForKey = GraphHelper.getProperty(instanceVertex, propertyNameForKey).toString(); + Object propertyValueForKey = GraphHelper.getMapValueProperty(elementType, instanceVertex, propertyNameForKey); currentMap.put(key, propertyValueForKey); } } if (!newAttributeEmpty) { - for (Map.Entry entry : newAttribute.entrySet()) { + for (Map.Entry<Object,Object> entry : newAttribute.entrySet()) { String keyStr = entry.getKey().toString(); String propertyNameForKey = GraphHelper.getQualifiedNameForMapKey(propertyName, keyStr); @@ -423,53 +425,53 @@ public final class TypedInstanceToGraphMapper { entry.getValue(), currentMap.get(keyStr), propertyNameForKey, operation); //Add/Update/Remove property value - GraphHelper.setProperty(instanceVertex, propertyNameForKey, newEntry); + GraphHelper.setMapValueProperty(elementType, instanceVertex, propertyNameForKey, newEntry); newMap.put(keyStr, newEntry); } } - Map<String, String> additionalMap = + Map<String, Object> additionalMap = removeUnusedMapEntries(instanceVertex, propertyName, currentMap, newMap, elementType, attributeInfo); Set<String> newKeys = new HashSet<>(newMap.keySet()); newKeys.addAll(additionalMap.keySet()); + // for dereference on way out - GraphHelper.setProperty(instanceVertex, propertyName, new ArrayList<>(newKeys)); + GraphHelper.setListProperty(instanceVertex, propertyName, new ArrayList<>(newKeys)); } //Remove unused entries from map - private Map<String, String> removeUnusedMapEntries(Vertex instanceVertex, String propertyName, - Map<String, String> currentMap, - Map<String, Object> newMap, IDataType elementType, - AttributeInfo attributeInfo) - throws AtlasException { - boolean reference = (elementType.getTypeCategory() == DataTypes.TypeCategory.STRUCT - || elementType.getTypeCategory() == DataTypes.TypeCategory.CLASS); - Map<String, String> additionalMap = new HashMap<>(); - + private Map<String, Object> removeUnusedMapEntries( + AtlasVertex instanceVertex, String propertyName, + Map<String, Object> currentMap, + Map<String, Object> newMap, IDataType elementType, + AttributeInfo attributeInfo) + throws AtlasException { + + Map<String, Object> additionalMap = new HashMap<>(); for (String currentKey : currentMap.keySet()) { + boolean shouldDeleteKey = !newMap.containsKey(currentKey); - if (reference) { - String currentEdge = currentMap.get(currentKey); + if (GraphHelper.isReference(elementType)) { + //Delete the edge reference if its not part of new edges created/updated + AtlasEdge currentEdge = (AtlasEdge)currentMap.get(currentKey); + if (!newMap.values().contains(currentEdge)) { - String edgeLabel = GraphHelper.getQualifiedNameForMapKey(propertyName, currentKey); - Edge edge = graphHelper.getEdgeByEdgeId(instanceVertex, edgeLabel, currentMap.get(currentKey)); - if(edge != null) { - boolean deleted = - deleteHandler.deleteEdgeReference(edge, elementType.getTypeCategory(), attributeInfo.isComposite, true); - if (!deleted) { - additionalMap.put(currentKey, currentEdge); - shouldDeleteKey = false; - } + + boolean deleted = + deleteHandler.deleteEdgeReference(currentEdge, elementType.getTypeCategory(), attributeInfo.isComposite, true); + if (!deleted) { + additionalMap.put(currentKey, currentEdge); + shouldDeleteKey = false; } } } if (shouldDeleteKey) { String propertyNameForKey = GraphHelper.getQualifiedNameForMapKey(propertyName, currentKey); - graphHelper.setProperty(instanceVertex, propertyNameForKey, null); + GraphHelper.setProperty(instanceVertex, propertyNameForKey, null); } } return additionalMap; @@ -477,8 +479,8 @@ public final class TypedInstanceToGraphMapper { /******************************************** ARRAY & MAP **************************************************/ - private Object addOrUpdateCollectionEntry(Vertex instanceVertex, AttributeInfo attributeInfo, - IDataType elementType, Object newAttributeValue, String currentValue, + private Object addOrUpdateCollectionEntry(AtlasVertex instanceVertex, AttributeInfo attributeInfo, + IDataType elementType, Object newAttributeValue, Object currentValue, String propertyName, Operation operation) throws AtlasException { @@ -496,8 +498,7 @@ public final class TypedInstanceToGraphMapper { case STRUCT: case CLASS: final String edgeLabel = GraphHelper.EDGE_LABEL_PREFIX + propertyName; - Edge currentEdge = graphHelper.getEdgeByEdgeId(instanceVertex, edgeLabel, currentValue); - return addOrUpdateReference(instanceVertex, attributeInfo, elementType, newAttributeValue, currentEdge, + return addOrUpdateReference(instanceVertex, attributeInfo, elementType, newAttributeValue, (AtlasEdge)currentValue, edgeLabel, operation); default: @@ -505,8 +506,8 @@ public final class TypedInstanceToGraphMapper { } } - private String addOrUpdateReference(Vertex instanceVertex, AttributeInfo attributeInfo, - IDataType attributeType, Object newAttributeValue, Edge currentEdge, + private AtlasEdge addOrUpdateReference(AtlasVertex instanceVertex, AttributeInfo attributeInfo, + IDataType attributeType, Object newAttributeValue, AtlasEdge currentEdge, String edgeLabel, Operation operation) throws AtlasException { switch (attributeType.getTypeCategory()) { case STRUCT: @@ -523,26 +524,26 @@ public final class TypedInstanceToGraphMapper { } /******************************************** STRUCT **************************************************/ - private String addOrUpdateStruct(Vertex instanceVertex, AttributeInfo attributeInfo, - ITypedStruct newAttributeValue, Edge currentEdge, - String edgeLabel, Operation operation) throws AtlasException { - String newEdgeId = null; - if (currentEdge != null && newAttributeValue != null) { + + private AtlasEdge addOrUpdateStruct(AtlasVertex instanceVertex, AttributeInfo attributeInfo, + ITypedStruct newAttributeValue, AtlasEdge currentEdge, + String edgeLabel, Operation operation) throws AtlasException { + AtlasEdge newEdge = null; + if (GraphHelper.elementExists(currentEdge) && newAttributeValue != null) { //update updateStructVertex(newAttributeValue, currentEdge, operation); - newEdgeId = currentEdge.getId().toString(); - } else if (currentEdge == null && newAttributeValue != null) { + newEdge = currentEdge; + } else if (! GraphHelper.elementExists(currentEdge) && newAttributeValue != null) { //add - Edge newEdge = addStructVertex(newAttributeValue, instanceVertex, attributeInfo, edgeLabel); - newEdgeId = newEdge.getId().toString(); + newEdge = addStructVertex(newAttributeValue, instanceVertex, attributeInfo, edgeLabel); } - return newEdgeId; + return newEdge; } - private Edge addStructVertex(ITypedStruct structInstance, Vertex instanceVertex, + private AtlasEdge addStructVertex(ITypedStruct structInstance, AtlasVertex instanceVertex, AttributeInfo attributeInfo, String edgeLabel) throws AtlasException { // add a new vertex for the struct or trait instance - Vertex structInstanceVertex = graphHelper.createVertexWithoutIdentity(structInstance.getTypeName(), null, + AtlasVertex structInstanceVertex = graphHelper.createVertexWithoutIdentity(structInstance.getTypeName(), null, Collections.<String>emptySet()); // no super types for struct type LOG.debug("created vertex {} for struct {} value {}", string(structInstanceVertex), attributeInfo.name, structInstance.toShortString()); @@ -551,22 +552,22 @@ public final class TypedInstanceToGraphMapper { mapInstanceToVertex(structInstance, structInstanceVertex, structInstance.fieldMapping().fields, false, Operation.CREATE); // add an edge to the newly created vertex from the parent - Edge newEdge = graphHelper.getOrCreateEdge(instanceVertex, structInstanceVertex, edgeLabel); + AtlasEdge newEdge = graphHelper.getOrCreateEdge(instanceVertex, structInstanceVertex, edgeLabel); return newEdge; } - private void updateStructVertex(ITypedStruct newAttributeValue, Edge currentEdge, - Operation operation) throws AtlasException { + private void updateStructVertex(ITypedStruct newAttributeValue, AtlasEdge currentEdge, + Operation operation) throws AtlasException { //Already existing vertex. Update - Vertex structInstanceVertex = currentEdge.getVertex(Direction.IN); + AtlasVertex structInstanceVertex = currentEdge.getInVertex(); LOG.debug("Updating struct vertex {} with struct {}", string(structInstanceVertex), newAttributeValue.toShortString()); // Update attributes final MessageDigest digester = MD5Utils.getDigester(); String newSignature = newAttributeValue.getSignatureHash(digester); - String curSignature = GraphHelper.getProperty(structInstanceVertex, SIGNATURE_HASH_PROPERTY_KEY); + String curSignature = GraphHelper.getSingleValuedProperty(structInstanceVertex, SIGNATURE_HASH_PROPERTY_KEY, String.class); if (!newSignature.equals(curSignature)) { //Update struct vertex instance only if there is a change @@ -578,33 +579,34 @@ public final class TypedInstanceToGraphMapper { /******************************************** CLASS **************************************************/ - private String addOrUpdateClassVertex(Vertex instanceVertex, Edge currentEdge, - ITypedReferenceableInstance newAttributeValue, AttributeInfo attributeInfo, - String edgeLabel) throws AtlasException { - Vertex newReferenceVertex = getClassVertex(newAttributeValue); - if(newReferenceVertex == null && newAttributeValue != null) { + private AtlasEdge addOrUpdateClassVertex(AtlasVertex instanceVertex, AtlasEdge currentEdge, + ITypedReferenceableInstance newAttributeValue, AttributeInfo attributeInfo, + String edgeLabel) throws AtlasException { + AtlasVertex newReferenceVertex = getClassVertex(newAttributeValue); + if( ! GraphHelper.elementExists(newReferenceVertex) && newAttributeValue != null) { LOG.error("Could not find vertex for Class Reference " + newAttributeValue); throw new EntityNotFoundException("Could not find vertex for Class Reference " + newAttributeValue); } - String newEdgeId = null; - if (currentEdge != null && newAttributeValue != null) { - newEdgeId = updateClassEdge(instanceVertex, currentEdge, newAttributeValue, newReferenceVertex, + AtlasEdge newEdge = null; + if (GraphHelper.elementExists(currentEdge) && newAttributeValue != null) { + newEdge = updateClassEdge(instanceVertex, currentEdge, newAttributeValue, newReferenceVertex, attributeInfo, edgeLabel); - } else if (currentEdge == null && newAttributeValue != null){ - Edge newEdge = addClassEdge(instanceVertex, newReferenceVertex, edgeLabel); - newEdgeId = newEdge.getId().toString(); + } else if (! GraphHelper.elementExists(currentEdge) && newAttributeValue != null){ + newEdge = addClassEdge(instanceVertex, newReferenceVertex, edgeLabel); + } - return newEdgeId; + return newEdge; } - private Edge addClassEdge(Vertex instanceVertex, Vertex toVertex, String edgeLabel) throws AtlasException { + + private AtlasEdge addClassEdge(AtlasVertex instanceVertex, AtlasVertex toVertex, String edgeLabel) throws AtlasException { // add an edge to the class vertex from the instance return graphHelper.getOrCreateEdge(instanceVertex, toVertex, edgeLabel); } - private Vertex getClassVertex(ITypedReferenceableInstance typedReference) throws EntityNotFoundException { - Vertex referenceVertex = null; + private AtlasVertex getClassVertex(ITypedReferenceableInstance typedReference) throws EntityNotFoundException { + AtlasVertex referenceVertex = null; Id id = null; if (typedReference != null) { id = typedReference instanceof Id ? (Id) typedReference : typedReference.getId(); @@ -625,7 +627,7 @@ public final class TypedInstanceToGraphMapper { Id id = typedReference instanceof Id ? (Id) typedReference : typedReference.getId(); if (id.isUnassigned()) { - Vertex classVertex = idToVertexMap.get(id); + AtlasVertex classVertex = idToVertexMap.get(id); String guid = GraphHelper.getIdFromVertex(classVertex); id = new Id(guid, 0, typedReference.getTypeName()); } @@ -633,48 +635,48 @@ public final class TypedInstanceToGraphMapper { } - private String updateClassEdge(Vertex instanceVertex, Edge currentEdge, - ITypedReferenceableInstance newAttributeValue, - Vertex newVertex, AttributeInfo attributeInfo, - String edgeLabel) throws AtlasException { + private AtlasEdge updateClassEdge(AtlasVertex instanceVertex, AtlasEdge currentEdge, + ITypedReferenceableInstance newAttributeValue, + AtlasVertex newVertex, AttributeInfo attributeInfo, + String edgeLabel) throws AtlasException { LOG.debug("Updating {} for reference attribute {}", string(currentEdge), attributeInfo.name); // Update edge if it exists - Vertex currentVertex = currentEdge.getVertex(Direction.IN); + AtlasVertex currentVertex = currentEdge.getInVertex(); String currentEntityId = GraphHelper.getIdFromVertex(currentVertex); String newEntityId = getId(newAttributeValue).id; - String newEdgeId = currentEdge.getId().toString(); + AtlasEdge newEdge = currentEdge; if (!currentEntityId.equals(newEntityId)) { // add an edge to the class vertex from the instance if (newVertex != null) { - Edge newEdge = graphHelper.getOrCreateEdge(instanceVertex, newVertex, edgeLabel); - newEdgeId = newEdge.getId().toString(); + newEdge = graphHelper.getOrCreateEdge(instanceVertex, newVertex, edgeLabel); + } } - return newEdgeId; + return newEdge; } /******************************************** TRAITS ****************************************************/ - void mapTraitInstanceToVertex(ITypedStruct traitInstance, IDataType entityType, Vertex parentInstanceVertex) + void mapTraitInstanceToVertex(ITypedStruct traitInstance, IDataType entityType, AtlasVertex parentInstanceVertex) throws AtlasException { - // add a new vertex for the struct or trait instance + // add a new AtlasVertex for the struct or trait instance final String traitName = traitInstance.getTypeName(); - Vertex traitInstanceVertex = graphHelper.createVertexWithoutIdentity(traitInstance.getTypeName(), null, + AtlasVertex traitInstanceVertex = graphHelper.createVertexWithoutIdentity(traitInstance.getTypeName(), null, typeSystem.getDataType(TraitType.class, traitName).getAllSuperTypeNames()); LOG.debug("created vertex {} for trait {}", string(traitInstanceVertex), traitName); - // map all the attributes to this newly created vertex + // map all the attributes to this newly created AtlasVertex mapInstanceToVertex(traitInstance, traitInstanceVertex, traitInstance.fieldMapping().fields, false, Operation.CREATE); - // add an edge to the newly created vertex from the parent + // add an edge to the newly created AtlasVertex from the parent String relationshipLabel = GraphHelper.getTraitLabel(entityType.getName(), traitName); graphHelper.getOrCreateEdge(parentInstanceVertex, traitInstanceVertex, relationshipLabel); } /******************************************** PRIMITIVES **************************************************/ - private void mapPrimitiveOrEnumToVertex(ITypedInstance typedInstance, Vertex instanceVertex, + private void mapPrimitiveOrEnumToVertex(ITypedInstance typedInstance, AtlasVertex instanceVertex, AttributeInfo attributeInfo) throws AtlasException { Object attrValue = typedInstance.get(attributeInfo.name); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d2d6ff7d/repository/src/main/java/org/apache/atlas/repository/typestore/GraphBackedTypeStore.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/typestore/GraphBackedTypeStore.java b/repository/src/main/java/org/apache/atlas/repository/typestore/GraphBackedTypeStore.java index b7e706f..ee63061 100755 --- a/repository/src/main/java/org/apache/atlas/repository/typestore/GraphBackedTypeStore.java +++ b/repository/src/main/java/org/apache/atlas/repository/typestore/GraphBackedTypeStore.java @@ -18,25 +18,29 @@ package org.apache.atlas.repository.typestore; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.inject.Inject; -import com.google.inject.Singleton; -import com.thinkaurelius.titan.core.TitanGraph; -import com.tinkerpop.blueprints.Direction; -import com.tinkerpop.blueprints.Edge; -import com.tinkerpop.blueprints.Vertex; -import org.apache.atlas.AtlasConstants; +import static org.apache.atlas.repository.graph.GraphHelper.setProperty; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + import org.apache.atlas.AtlasException; import org.apache.atlas.GraphTransaction; import org.apache.atlas.repository.Constants; +import org.apache.atlas.repository.graph.AtlasGraphProvider; import org.apache.atlas.repository.graph.GraphHelper; -import org.apache.atlas.repository.graph.GraphProvider; +import org.apache.atlas.repository.graphdb.AtlasEdge; +import org.apache.atlas.repository.graphdb.AtlasEdgeDirection; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.typesystem.TypesDef; import org.apache.atlas.typesystem.types.AttributeDefinition; import org.apache.atlas.typesystem.types.AttributeInfo; import org.apache.atlas.typesystem.types.ClassType; import org.apache.atlas.typesystem.types.DataTypes; +import org.apache.atlas.typesystem.types.DataTypes.TypeCategory; import org.apache.atlas.typesystem.types.EnumType; import org.apache.atlas.typesystem.types.EnumTypeDefinition; import org.apache.atlas.typesystem.types.EnumValue; @@ -53,13 +57,10 @@ import org.codehaus.jettison.json.JSONException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Set; - -import static org.apache.atlas.repository.graph.GraphHelper.setProperty; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.inject.Inject; +import com.google.inject.Singleton; @Singleton public class GraphBackedTypeStore implements ITypeStore { @@ -69,13 +70,13 @@ public class GraphBackedTypeStore implements ITypeStore { private static Logger LOG = LoggerFactory.getLogger(GraphBackedTypeStore.class); - private final TitanGraph titanGraph; + private final AtlasGraph graph; private GraphHelper graphHelper = GraphHelper.getInstance(); @Inject - public GraphBackedTypeStore(GraphProvider<TitanGraph> graphProvider) { - titanGraph = graphProvider.get(); + public GraphBackedTypeStore() { + graph = AtlasGraphProvider.getGraphInstance(); } @Override @@ -91,14 +92,14 @@ public class GraphBackedTypeStore implements ITypeStore { case STRUCT: StructType structType = (StructType) dataType; - storeInGraph(typeSystem, dataType.getTypeCategory(), dataType.getName(), dataType.getDescription(), dataType.getVersion(), + storeInGraph(typeSystem, dataType.getTypeCategory(), dataType.getName(), dataType.getDescription(), ImmutableList.copyOf(structType.infoToNameMap.keySet()), ImmutableSet.<String>of()); break; case TRAIT: case CLASS: HierarchicalType type = (HierarchicalType) dataType; - storeInGraph(typeSystem, dataType.getTypeCategory(), dataType.getName(), type.getDescription(), type.getVersion(), type.immediateAttrs, + storeInGraph(typeSystem, dataType.getTypeCategory(), dataType.getName(), type.getDescription(), type.immediateAttrs, type.superTypes); break; @@ -109,14 +110,14 @@ public class GraphBackedTypeStore implements ITypeStore { } private void storeInGraph(EnumType dataType) { - Vertex vertex = createVertex(dataType.getTypeCategory(), dataType.getName(), dataType.getDescription(), dataType.getVersion()); + AtlasVertex AtlasVertex = createVertex(dataType.getTypeCategory(), dataType.getName(), dataType.getDescription()); List<String> values = new ArrayList<>(dataType.values().size()); for (EnumValue enumValue : dataType.values()) { String key = getPropertyKey(dataType.getName(), enumValue.value); - setProperty(vertex, key, enumValue.ordinal); + setProperty(AtlasVertex, key, enumValue.ordinal); values.add(enumValue.value); } - setProperty(vertex, getPropertyKey(dataType.getName()), values); + setProperty(AtlasVertex, getPropertyKey(dataType.getName()), values); } private String getPropertyKey(String name) { @@ -131,9 +132,9 @@ public class GraphBackedTypeStore implements ITypeStore { return PROPERTY_PREFIX + "edge." + parent + "." + child; } - private void storeInGraph(TypeSystem typeSystem, DataTypes.TypeCategory category, String typeName, String typeDescription, String typeVersion, + private void storeInGraph(TypeSystem typeSystem, DataTypes.TypeCategory category, String typeName, String typeDescription, ImmutableList<AttributeInfo> attributes, ImmutableSet<String> superTypes) throws AtlasException { - Vertex vertex = createVertex(category, typeName, typeDescription, typeVersion); + AtlasVertex vertex = createVertex(category, typeName, typeDescription); List<String> attrNames = new ArrayList<>(); if (attributes != null) { for (AttributeInfo attribute : attributes) { @@ -153,18 +154,18 @@ public class GraphBackedTypeStore implements ITypeStore { if (superTypes != null) { for (String superTypeName : superTypes) { HierarchicalType superType = typeSystem.getDataType(HierarchicalType.class, superTypeName); - Vertex superVertex = createVertex(superType.getTypeCategory(), superTypeName, superType.getDescription(), AtlasConstants.DEFAULT_TYPE_VERSION); + AtlasVertex superVertex = createVertex(superType.getTypeCategory(), superTypeName, superType.getDescription()); graphHelper.getOrCreateEdge(vertex, superVertex, SUPERTYPE_EDGE_LABEL); } } } - private void addReferencesForAttribute(TypeSystem typeSystem, Vertex vertex, AttributeInfo attribute) + private void addReferencesForAttribute(TypeSystem typeSystem, AtlasVertex vertex, AttributeInfo attribute) throws AtlasException { ImmutableList<String> coreTypes = typeSystem.getCoreTypes(); List<IDataType> attrDataTypes = new ArrayList<>(); IDataType attrDataType = attribute.dataType(); - String vertexTypeName = vertex.getProperty(Constants.TYPENAME_PROPERTY_KEY); + String vertexTypeName = GraphHelper.getSingleValuedProperty(vertex, Constants.TYPENAME_PROPERTY_KEY, String.class); switch (attrDataType.getTypeCategory()) { case ARRAY: @@ -201,7 +202,7 @@ public class GraphBackedTypeStore implements ITypeStore { for (IDataType attrType : attrDataTypes) { if (!coreTypes.contains(attrType.getName())) { - Vertex attrVertex = createVertex(attrType.getTypeCategory(), attrType.getName(), attrType.getDescription(), attrType.getVersion()); + AtlasVertex attrVertex = createVertex(attrType.getTypeCategory(), attrType.getName(), attrType.getDescription()); String label = getEdgeLabel(vertexTypeName, attribute.name); graphHelper.getOrCreateEdge(vertex, attrVertex, label); } @@ -213,7 +214,7 @@ public class GraphBackedTypeStore implements ITypeStore { public TypesDef restore() throws AtlasException { //Get all vertices for type system Iterator vertices = - titanGraph.query().has(Constants.VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE).vertices().iterator(); + graph.query().has(Constants.VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE).vertices().iterator(); return getTypesFromVertices(vertices); } @@ -221,25 +222,24 @@ public class GraphBackedTypeStore implements ITypeStore { @Override @GraphTransaction public TypesDef restoreType(String typeName) throws AtlasException { - // Get vertex for the specified type name. + // Get AtlasVertex for the specified type name. Iterator vertices = - titanGraph.query().has(Constants.VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE).has(Constants.TYPENAME_PROPERTY_KEY, typeName).vertices().iterator(); + graph.query().has(Constants.VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE).has(Constants.TYPENAME_PROPERTY_KEY, typeName).vertices().iterator(); return getTypesFromVertices(vertices); } - private TypesDef getTypesFromVertices(Iterator vertices) throws AtlasException { + private TypesDef getTypesFromVertices(Iterator<AtlasVertex> vertices) throws AtlasException { ImmutableList.Builder<EnumTypeDefinition> enums = ImmutableList.builder(); ImmutableList.Builder<StructTypeDefinition> structs = ImmutableList.builder(); ImmutableList.Builder<HierarchicalTypeDefinition<ClassType>> classTypes = ImmutableList.builder(); ImmutableList.Builder<HierarchicalTypeDefinition<TraitType>> traits = ImmutableList.builder(); while (vertices.hasNext()) { - Vertex vertex = (Vertex) vertices.next(); - DataTypes.TypeCategory typeCategory = vertex.getProperty(Constants.TYPE_CATEGORY_PROPERTY_KEY); - String typeName = vertex.getProperty(Constants.TYPENAME_PROPERTY_KEY); - String typeDescription = vertex.getProperty(Constants.TYPEDESCRIPTION_PROPERTY_KEY); - String typeVersion = vertex.getProperty(Constants.TYPEVERSION_PROPERTY_KEY); + AtlasVertex vertex = vertices.next(); + DataTypes.TypeCategory typeCategory = GraphHelper.getSingleValuedProperty(vertex, Constants.TYPE_CATEGORY_PROPERTY_KEY, TypeCategory.class); + String typeName = GraphHelper.getSingleValuedProperty(vertex, Constants.TYPENAME_PROPERTY_KEY, String.class); + String typeDescription = GraphHelper.getSingleValuedProperty(vertex, Constants.TYPEDESCRIPTION_PROPERTY_KEY, String.class); LOG.info("Restoring type {}.{}.{}", typeCategory, typeName, typeDescription); switch (typeCategory) { case ENUM: @@ -248,19 +248,19 @@ public class GraphBackedTypeStore implements ITypeStore { case STRUCT: AttributeDefinition[] attributes = getAttributes(vertex, typeName); - structs.add(new StructTypeDefinition(typeName, typeDescription, typeVersion, attributes)); + structs.add(new StructTypeDefinition(typeName, typeDescription, attributes)); break; case CLASS: ImmutableSet<String> superTypes = getSuperTypes(vertex); attributes = getAttributes(vertex, typeName); - classTypes.add(new HierarchicalTypeDefinition(ClassType.class, typeName, typeDescription, typeVersion, superTypes, attributes)); + classTypes.add(new HierarchicalTypeDefinition(ClassType.class, typeName, typeDescription, superTypes, attributes)); break; case TRAIT: superTypes = getSuperTypes(vertex); attributes = getAttributes(vertex, typeName); - traits.add(new HierarchicalTypeDefinition(TraitType.class, typeName, typeDescription, typeVersion, superTypes, attributes)); + traits.add(new HierarchicalTypeDefinition(TraitType.class, typeName, typeDescription, superTypes, attributes)); break; default: @@ -270,37 +270,40 @@ public class GraphBackedTypeStore implements ITypeStore { return TypesUtil.getTypesDef(enums.build(), structs.build(), traits.build(), classTypes.build()); } - private EnumTypeDefinition getEnumType(Vertex vertex) { - String typeName = vertex.getProperty(Constants.TYPENAME_PROPERTY_KEY); - String typeDescription = vertex.getProperty(Constants.TYPEDESCRIPTION_PROPERTY_KEY); - String typeVersion = vertex.getProperty(Constants.TYPEVERSION_PROPERTY_KEY); + private EnumTypeDefinition getEnumType(AtlasVertex vertex) throws AtlasException { + String typeName = GraphHelper.getSingleValuedProperty(vertex, Constants.TYPENAME_PROPERTY_KEY, String.class); + String typeDescription = GraphHelper.getSingleValuedProperty(vertex, Constants.TYPEDESCRIPTION_PROPERTY_KEY, String.class); List<EnumValue> enumValues = new ArrayList<>(); - List<String> values = graphHelper.getProperty(vertex, getPropertyKey(typeName)); + List<String> values = vertex.getListProperty(getPropertyKey(typeName)); for (String value : values) { String valueProperty = getPropertyKey(typeName, value); - enumValues.add(new EnumValue(value, (Integer) graphHelper.getProperty(vertex, valueProperty))); + enumValues.add(new EnumValue(value, GraphHelper.getSingleValuedProperty(vertex, valueProperty, Integer.class))); } - return new EnumTypeDefinition(typeName, typeDescription, typeVersion, enumValues.toArray(new EnumValue[enumValues.size()])); + return new EnumTypeDefinition(typeName, typeDescription, enumValues.toArray(new EnumValue[enumValues.size()])); } - private ImmutableSet<String> getSuperTypes(Vertex vertex) { + private ImmutableSet<String> getSuperTypes(AtlasVertex vertex) { Set<String> superTypes = new HashSet<>(); - Iterator<Edge> edges = graphHelper.getOutGoingEdgesByLabel(vertex, SUPERTYPE_EDGE_LABEL); + Iterator<AtlasEdge> edges = vertex.getEdges(AtlasEdgeDirection.OUT, SUPERTYPE_EDGE_LABEL).iterator(); while (edges.hasNext()) { - Edge edge = edges.next(); - superTypes.add((String) edge.getVertex(Direction.IN).getProperty(Constants.TYPENAME_PROPERTY_KEY)); + AtlasEdge edge = edges.next(); + superTypes.add(edge.getInVertex().getProperty(Constants.TYPENAME_PROPERTY_KEY, String.class)); } return ImmutableSet.copyOf(superTypes); } - private AttributeDefinition[] getAttributes(Vertex vertex, String typeName) throws AtlasException { + private AttributeDefinition[] getAttributes(AtlasVertex vertex, String typeName) throws AtlasException { List<AttributeDefinition> attributes = new ArrayList<>(); - List<String> attrNames = graphHelper.getProperty(vertex, getPropertyKey(typeName)); + List<String> attrNames = vertex.getListProperty(getPropertyKey(typeName)); if (attrNames != null) { for (String attrName : attrNames) { try { String propertyKey = getPropertyKey(typeName, attrName); - attributes.add(AttributeInfo.fromJson((String) graphHelper.getProperty(vertex, propertyKey))); + AttributeDefinition attrValue = AttributeInfo.fromJson((String) vertex.getJsonProperty(propertyKey)); + if (attrValue != null) + { + attributes.add(attrValue); + } } catch (JSONException e) { throw new AtlasException(e); } @@ -315,24 +318,24 @@ public class GraphBackedTypeStore implements ITypeStore { * @param typeName * @return vertex */ - Vertex findVertex(DataTypes.TypeCategory category, String typeName) { - LOG.debug("Finding vertex for {}.{}", category, typeName); + AtlasVertex findVertex(DataTypes.TypeCategory category, String typeName) { + LOG.debug("Finding AtlasVertex for {}.{}", category, typeName); - Iterator results = titanGraph.query().has(Constants.TYPENAME_PROPERTY_KEY, typeName).vertices().iterator(); - Vertex vertex = null; + Iterator results = graph.query().has(Constants.TYPENAME_PROPERTY_KEY, typeName).vertices().iterator(); + AtlasVertex vertex = null; if (results != null && results.hasNext()) { - //There should be just one vertex with the given typeName - vertex = (Vertex) results.next(); + //There should be just one AtlasVertex with the given typeName + vertex = (AtlasVertex) results.next(); } return vertex; } - private Vertex createVertex(DataTypes.TypeCategory category, String typeName, String typeDescription, String typeVersion) { - Vertex vertex = findVertex(category, typeName); + private AtlasVertex createVertex(DataTypes.TypeCategory category, String typeName, String typeDescription) { + AtlasVertex vertex = findVertex(category, typeName); if (vertex == null) { LOG.debug("Adding vertex {}{}", PROPERTY_PREFIX, typeName); - vertex = titanGraph.addVertex(null); - setProperty(vertex, Constants.VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE); // Mark as type vertex + vertex = graph.addVertex(); + setProperty(vertex, Constants.VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE); // Mark as type AtlasVertex setProperty(vertex, Constants.TYPE_CATEGORY_PROPERTY_KEY, category); setProperty(vertex, Constants.TYPENAME_PROPERTY_KEY, typeName); } @@ -344,16 +347,6 @@ public class GraphBackedTypeStore implements ITypeStore { } else { LOG.debug(" type description is null "); } - - if (typeVersion != null) { - String oldVersion = getPropertyKey(Constants.TYPEVERSION_PROPERTY_KEY); - if (!typeVersion.equals(oldVersion)) { - setProperty(vertex, Constants.TYPEVERSION_PROPERTY_KEY, typeVersion); - LOG.info(" updating type {} to version {}", typeName, typeVersion); - } - } else { - LOG.info(" type version is null "); - } return vertex; } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d2d6ff7d/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java index 026e98c..5b4eb0e 100755 --- a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java +++ b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java @@ -18,10 +18,18 @@ package org.apache.atlas.services; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.inject.Provider; +import static org.apache.atlas.AtlasClient.PROCESS_ATTRIBUTE_INPUTS; +import static org.apache.atlas.AtlasClient.PROCESS_ATTRIBUTE_OUTPUTS; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; + +import javax.inject.Inject; +import javax.inject.Singleton; + import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasException; @@ -36,6 +44,7 @@ import org.apache.atlas.query.QueryParser; import org.apache.atlas.repository.MetadataRepository; import org.apache.atlas.repository.RepositoryException; import org.apache.atlas.repository.audit.EntityAuditRepository; +import org.apache.atlas.repository.graph.GraphHelper; import org.apache.atlas.repository.typestore.ITypeStore; import org.apache.atlas.typesystem.IStruct; import org.apache.atlas.typesystem.ITypedReferenceableInstance; @@ -60,29 +69,22 @@ import org.apache.atlas.typesystem.types.Multiplicity; import org.apache.atlas.typesystem.types.StructTypeDefinition; import org.apache.atlas.typesystem.types.TraitType; import org.apache.atlas.typesystem.types.TypeSystem; -import org.apache.atlas.typesystem.types.ValueConversionException; import org.apache.atlas.typesystem.types.cache.TypeCache; import org.apache.atlas.typesystem.types.utils.TypesUtil; import org.apache.atlas.utils.ParamChecker; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.configuration.Configuration; -import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.collection.Set; -import javax.inject.Inject; -import javax.inject.Singleton; -import java.util.ArrayList; -import java.util.Collection; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.inject.Provider; + -import static org.apache.atlas.AtlasClient.PROCESS_ATTRIBUTE_INPUTS; -import static org.apache.atlas.AtlasClient.PROCESS_ATTRIBUTE_OUTPUTS; /** * Simple wrapper over TypeSystem and MetadataRepository services with hooks @@ -118,8 +120,9 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang this(repository, typeStore, typesRegistrar, typeListenerProviders, entityListenerProviders, TypeSystem.getInstance(), ApplicationProperties.get(), typeCache); } - - DefaultMetadataService(final MetadataRepository repository, final ITypeStore typeStore, + + //for testing only + public DefaultMetadataService(final MetadataRepository repository, final ITypeStore typeStore, final IBootstrapTypesRegistrar typesRegistrar, final Collection<Provider<TypesChangeListener>> typeListenerProviders, final Collection<Provider<EntityChangeListener>> entityListenerProviders, @@ -153,8 +156,6 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang restoreTypeSystem(); } - AtlasPatchHandler.handlePatches(this, typeSystem); - maxAuditResults = configuration.getShort(CONFIG_MAX_AUDIT_RESULTS, DEFAULT_MAX_AUDIT_RESULTS); } @@ -243,6 +244,7 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang typeDefinition = ParamChecker.notEmpty(typeDefinition, "type definition"); TypesDef typesDef = validateTypeDefinition(typeDefinition); + try { final TypeSystem.TransientTypeSystem transientTypeSystem = typeSystem.createTransientTypeSystem(typesDef, isUpdate); final Map<String, IDataType> typesAdded = transientTypeSystem.getTypesAdded(); @@ -334,40 +336,13 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang return guids; } - private ITypedReferenceableInstance[] deserializeClassInstances(String entityInstanceDefinition) - throws AtlasException { - try { - JSONArray referableInstances = new JSONArray(entityInstanceDefinition); - ITypedReferenceableInstance[] instances = new ITypedReferenceableInstance[referableInstances.length()]; - for (int index = 0; index < referableInstances.length(); index++) { - Referenceable entityInstance = - InstanceSerialization.fromJsonReferenceable(referableInstances.getString(index), true); - ITypedReferenceableInstance typedInstrance = getTypedReferenceableInstance(entityInstance); - instances[index] = typedInstrance; - } - return instances; - } catch(ValueConversionException | TypeNotFoundException e) { - throw e; - } catch (Exception e) { // exception from deserializer - LOG.error("Unable to deserialize json={}", entityInstanceDefinition, e); - throw new IllegalArgumentException("Unable to deserialize json", e); - } + private ITypedReferenceableInstance[] deserializeClassInstances(String entityInstanceDefinition) throws AtlasException { + return GraphHelper.deserializeClassInstances(typeSystem, entityInstanceDefinition); } - + @Override public ITypedReferenceableInstance getTypedReferenceableInstance(Referenceable entityInstance) throws AtlasException { - final String entityTypeName = ParamChecker.notEmpty(entityInstance.getTypeName(), "Entity type cannot be null"); - - ClassType entityType = typeSystem.getDataType(ClassType.class, entityTypeName); - - //Both assigned id and values are required for full update - //classtype.convert() will remove values if id is assigned. So, set temp id, convert and - // then replace with original id - Id origId = entityInstance.getId(); - entityInstance.replaceWithNewId(new Id(entityInstance.getTypeName())); - ITypedReferenceableInstance typedInstrance = entityType.convert(entityInstance, Multiplicity.REQUIRED); - ((ReferenceableInstance)typedInstrance).replaceWithNewId(origId); - return typedInstrance; + return GraphHelper.getTypedReferenceableInstance(typeSystem, entityInstance); } /** http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d2d6ff7d/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java b/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java new file mode 100644 index 0000000..a270b97 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.util; + +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasException; +import org.apache.atlas.repository.audit.EntityAuditRepository; +import org.apache.atlas.repository.audit.HBaseBasedAuditRepository; +import org.apache.atlas.repository.graph.DeleteHandler; +import org.apache.atlas.repository.graph.SoftDeleteHandler; +import org.apache.atlas.repository.graphdb.GraphDatabase; +import org.apache.atlas.repository.typestore.GraphBackedTypeStore; +import org.apache.atlas.typesystem.types.cache.DefaultTypeCache; +import org.apache.atlas.typesystem.types.cache.TypeCache; +import org.apache.commons.configuration.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Atlas configuration for repository project + * + */ +public class AtlasRepositoryConfiguration { + + private static Logger LOG = LoggerFactory.getLogger(AtlasRepositoryConfiguration.class); + + public static final String TYPE_CACHE_IMPLEMENTATION_PROPERTY = "atlas.TypeCache.impl"; + + @SuppressWarnings("unchecked") + public static Class<? extends TypeCache> getTypeCache() { + + // Get the type cache implementation class from Atlas configuration. + try { + Configuration config = ApplicationProperties.get(); + return ApplicationProperties.getClass(config, TYPE_CACHE_IMPLEMENTATION_PROPERTY, + DefaultTypeCache.class.getName(), TypeCache.class); + } catch (AtlasException e) { + LOG.error("Error loading typecache ", e); + return DefaultTypeCache.class; + } + } + private static final String AUDIT_REPOSITORY_IMPLEMENTATION_PROPERTY = "atlas.EntityAuditRepository.impl"; + + @SuppressWarnings("unchecked") + public static Class<? extends EntityAuditRepository> getAuditRepositoryImpl() { + try { + Configuration config = ApplicationProperties.get(); + return ApplicationProperties.getClass(config, + AUDIT_REPOSITORY_IMPLEMENTATION_PROPERTY, HBaseBasedAuditRepository.class.getName(), EntityAuditRepository.class); + } catch (AtlasException e) { + throw new RuntimeException(e); + } + } + + private static final String DELETE_HANDLER_IMPLEMENTATION_PROPERTY = "atlas.DeleteHandler.impl"; + + @SuppressWarnings("unchecked") + public static Class<? extends DeleteHandler> getDeleteHandlerImpl() { + try { + Configuration config = ApplicationProperties.get(); + return ApplicationProperties.getClass(config, + DELETE_HANDLER_IMPLEMENTATION_PROPERTY, SoftDeleteHandler.class.getName(), DeleteHandler.class); + } catch (AtlasException e) { + throw new RuntimeException(e); + } + } + + private static final String GRAPH_DATABASE_IMPLEMENTATION_PROPERTY = "atlas.graphdb.backend"; + private static final String DEFAULT_GRAPH_DATABASE_IMPLEMENTATION_CLASS = "org.apache.atlas.repository.graphdb.titan0.Titan0GraphDatabase"; + + @SuppressWarnings("unchecked") + public static Class<? extends GraphDatabase> getGraphDatabaseImpl() { + try { + Configuration config = ApplicationProperties.get(); + return ApplicationProperties.getClass(config, + GRAPH_DATABASE_IMPLEMENTATION_PROPERTY, DEFAULT_GRAPH_DATABASE_IMPLEMENTATION_CLASS, GraphDatabase.class); + } catch (AtlasException e) { + throw new RuntimeException(e); + } + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d2d6ff7d/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala ---------------------------------------------------------------------- diff --git a/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala b/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala old mode 100755 new mode 100644 index c4621cd..569d3f9 --- a/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala +++ b/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala @@ -20,7 +20,7 @@ package org.apache.atlas.query import java.util -import com.thinkaurelius.titan.core.TitanGraph +import org.apache.atlas.repository.graphdb.AtlasGraph import org.apache.atlas.query.Expressions._ import org.apache.atlas.typesystem.ITypedStruct import org.apache.atlas.typesystem.json.{InstanceSerialization, Serialization} @@ -116,7 +116,7 @@ trait ClosureQuery { def withPath : Boolean def persistenceStrategy: GraphPersistenceStrategies - def g: TitanGraph + def g: AtlasGraph[_,_] def pathExpr : Expressions.Expression = { closureRelation.tail.foldLeft(closureRelation.head.toExpr)((b,a) => b.field(a.toFieldName)) @@ -184,8 +184,8 @@ trait ClosureQuery { * foreach resultRow * for each Path entry * add an entry in the edges Map - * add an entry for the Src Vertex to the vertex Map - * add an entry for the Dest Vertex to the vertex Map + * add an entry for the Src AtlasVertex to the vertex Map + * add an entry for the Dest AtlasVertex to the vertex Map */ res.rows.map(_.asInstanceOf[StructInstance]).foreach { r => @@ -207,7 +207,7 @@ trait ClosureQuery { } currVertex = nextVertex } - val vertex = r.get(TypeUtils.ResultWithPathStruct.resultAttrName) + val AtlasVertex = r.get(TypeUtils.ResultWithPathStruct.resultAttrName) vertices.put(id(srcVertex), vertexStruct(srcVertex, r.get(TypeUtils.ResultWithPathStruct.resultAttrName).asInstanceOf[ITypedStruct], s"${SRC_PREFIX}_")) @@ -242,6 +242,7 @@ trait SingleInstanceClosureQuery[T] extends ClosureQuery { } } +import scala.language.existentials; /** * A ClosureQuery to compute '''Lineage''' for Hive tables. Assumes the Lineage relation is captured in a ''CTAS'' * type, and the table relations are captured as attributes from a CTAS instance to Table instances. @@ -266,7 +267,7 @@ case class InputLineageClosureQuery(tableTypeName : String, selectAttributes : Option[List[String]], withPath : Boolean, persistenceStrategy: GraphPersistenceStrategies, - g: TitanGraph + g: AtlasGraph[_,_] ) extends SingleInstanceClosureQuery[String] { val closureType : String = tableTypeName @@ -306,7 +307,7 @@ case class OutputLineageClosureQuery(tableTypeName : String, selectAttributes : Option[List[String]], withPath : Boolean, persistenceStrategy: GraphPersistenceStrategies, - g: TitanGraph + g: AtlasGraph[_,_] ) extends SingleInstanceClosureQuery[String] { val closureType : String = tableTypeName
