Repository: incubator-atlas Updated Branches: refs/heads/branch-0.6-incubating dad90970d -> e1f05fa7f
ATLAS-342 Atlas is sending an ENTITY_CREATE event to the ATLAS_ENTITIES topic even if the entity exists already (shwethags) Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/e1f05fa7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/e1f05fa7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/e1f05fa7 Branch: refs/heads/branch-0.6-incubating Commit: e1f05fa7fbca95ae428790f3f7b6afdca6c1bb2d Parents: dad9097 Author: Shwetha GS <sshivalingamur...@hortonworks.com> Authored: Tue Dec 15 14:58:40 2015 +0530 Committer: Shwetha GS <sshivalingamur...@hortonworks.com> Committed: Tue Dec 15 14:58:40 2015 +0530 ---------------------------------------------------------------------- release-log.txt | 1 + .../atlas/repository/MetadataRepository.java | 11 +- .../graph/GraphBackedMetadataRepository.java | 13 ++- .../atlas/repository/graph/GraphHelper.java | 8 +- .../graph/TypedInstanceToGraphMapper.java | 100 +++++++++++-------- .../atlas/services/DefaultMetadataService.java | 80 ++++++++------- .../apache/atlas/BaseHiveRepositoryTest.java | 7 +- .../GraphBackedMetadataRepositoryTest.java | 27 +++-- .../graph/GraphRepoMapperScaleTest.java | 2 +- .../service/DefaultMetadataServiceTest.java | 9 +- .../apache/atlas/services/MetadataService.java | 17 ++-- .../atlas/typesystem/types/TypeUtils.java | 10 +- .../atlas/web/resources/EntityResource.java | 8 +- .../notification/EntityNotificationIT.java | 58 +---------- .../atlas/web/resources/BaseResourceIT.java | 54 +++++++++- .../web/resources/EntityJerseyResourceIT.java | 52 +++++++++- 16 files changed, 268 insertions(+), 189 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e1f05fa7/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 64dc568..2124965 100644 --- a/release-log.txt +++ b/release-log.txt @@ -9,6 +9,7 @@ ATLAS-54 Rename configs in hive hook (shwethags) ATLAS-3 Mixed Index creation fails with Date types (sumasai via shwethags) ALL CHANGES: +ATLAS-342 Atlas is sending an ENTITY_CREATE event to the ATLAS_ENTITIES topic even if the entity exists already (shwethags) ATLAS-386 Handle hive rename Table (shwethags) ATLAS-374 Doc: Create a wiki for documenting fault tolerance and HA options for Atlas data (yhemath via sumasai) ATLAS-346 Atlas server loses messages sent from Hive hook if restarted after unclean shutdown(yhemath via sumasai) http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e1f05fa7/repository/src/main/java/org/apache/atlas/repository/MetadataRepository.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/MetadataRepository.java b/repository/src/main/java/org/apache/atlas/repository/MetadataRepository.java index 2091e71..f66a4e5 100755 --- a/repository/src/main/java/org/apache/atlas/repository/MetadataRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/MetadataRepository.java @@ -19,12 +19,13 @@ package org.apache.atlas.repository; import org.apache.atlas.AtlasException; -import org.apache.atlas.typesystem.exception.EntityExistsException; -import org.apache.atlas.typesystem.exception.EntityNotFoundException; import org.apache.atlas.typesystem.ITypedReferenceableInstance; import org.apache.atlas.typesystem.ITypedStruct; +import org.apache.atlas.typesystem.exception.EntityExistsException; +import org.apache.atlas.typesystem.exception.EntityNotFoundException; import org.apache.atlas.typesystem.types.AttributeInfo; import org.apache.atlas.typesystem.types.IDataType; +import org.apache.atlas.typesystem.types.TypeUtils; import java.util.List; @@ -82,7 +83,7 @@ public interface MetadataRepository { * @throws RepositoryException * @throws EntityExistsException */ - String[] createEntities(ITypedReferenceableInstance... entities) throws RepositoryException, EntityExistsException; + List<String> createEntities(ITypedReferenceableInstance... entities) throws RepositoryException, EntityExistsException; /** * Fetch the complete definition of an entity given its GUID. @@ -143,13 +144,13 @@ public interface MetadataRepository { * Adds/Updates the property to the entity that corresponds to the GUID * Supports only primitive attribute/Class Id updations. */ - void updatePartial(ITypedReferenceableInstance entity) throws RepositoryException; + TypeUtils.Pair<List<String>, List<String>> updatePartial(ITypedReferenceableInstance entity) throws RepositoryException; /** * Adds the property to the entity that corresponds to the GUID * @param entitiesToBeUpdated The entities to be updated */ - String[] updateEntities(ITypedReferenceableInstance... entitiesToBeUpdated) throws RepositoryException; + TypeUtils.Pair<List<String>, List<String>> updateEntities(ITypedReferenceableInstance... entitiesToBeUpdated) throws RepositoryException; /** * Returns the entity for the given type and qualified name http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e1f05fa7/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java index fe1e576..dd64124 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java @@ -37,6 +37,7 @@ import org.apache.atlas.typesystem.types.AttributeInfo; import org.apache.atlas.typesystem.types.ClassType; import org.apache.atlas.typesystem.types.IDataType; import org.apache.atlas.typesystem.types.TypeSystem; +import org.apache.atlas.typesystem.types.TypeUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -114,12 +115,14 @@ public class GraphBackedMetadataRepository implements MetadataRepository { @Override @GraphTransaction - public String[] createEntities(ITypedReferenceableInstance... entities) throws RepositoryException, + public List<String> createEntities(ITypedReferenceableInstance... entities) throws RepositoryException, EntityExistsException { LOG.info("adding entities={}", entities); try { TypedInstanceToGraphMapper instanceToGraphMapper = new TypedInstanceToGraphMapper(graphToInstanceMapper); - return instanceToGraphMapper.mapTypedInstanceToGraph(TypedInstanceToGraphMapper.Operation.CREATE, entities); + TypeUtils.Pair<List<String>, List<String>> idPair = + instanceToGraphMapper.mapTypedInstanceToGraph(TypedInstanceToGraphMapper.Operation.CREATE, entities); + return idPair.left; } catch (EntityExistsException e) { throw e; } catch (AtlasException e) { @@ -279,7 +282,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository { @Override @GraphTransaction - public String[] updateEntities(ITypedReferenceableInstance... entitiesUpdated) throws RepositoryException { + public TypeUtils.Pair<List<String>, List<String>> updateEntities(ITypedReferenceableInstance... entitiesUpdated) throws RepositoryException { LOG.info("updating entity {}", entitiesUpdated); try { TypedInstanceToGraphMapper instanceToGraphMapper = new TypedInstanceToGraphMapper(graphToInstanceMapper); @@ -292,11 +295,11 @@ public class GraphBackedMetadataRepository implements MetadataRepository { @Override @GraphTransaction - public void updatePartial(ITypedReferenceableInstance entity) throws RepositoryException { + public TypeUtils.Pair<List<String>, List<String>> updatePartial(ITypedReferenceableInstance entity) throws RepositoryException { LOG.info("updating entity {}", entity); try { TypedInstanceToGraphMapper instanceToGraphMapper = new TypedInstanceToGraphMapper(graphToInstanceMapper); - instanceToGraphMapper.mapTypedInstanceToGraph(TypedInstanceToGraphMapper.Operation.UPDATE_PARTIAL, entity); + return instanceToGraphMapper.mapTypedInstanceToGraph(TypedInstanceToGraphMapper.Operation.UPDATE_PARTIAL, entity); } catch (AtlasException e) { throw new RepositoryException(e); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e1f05fa7/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java index 9ac2819..6b2d5d1 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java @@ -71,13 +71,13 @@ public final class GraphHelper { return INSTANCE; } - public Vertex createVertexWithIdentity(ITypedReferenceableInstance typedInstance, - Set<String> superTypeNames) { + public Vertex createVertexWithIdentity(ITypedReferenceableInstance typedInstance, Set<String> superTypeNames) { + final String guid = UUID.randomUUID().toString(); + final Vertex vertexWithIdentity = createVertexWithoutIdentity(typedInstance.getTypeName(), - typedInstance.getId(), superTypeNames); + new Id(guid, 0 , typedInstance.getTypeName()), superTypeNames); // add identity - final String guid = UUID.randomUUID().toString(); setProperty(vertexWithIdentity, Constants.GUID_PROPERTY_KEY, guid); // add version information http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e1f05fa7/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java index 7ef5c50..996f31b 100644 --- a/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java @@ -41,8 +41,8 @@ import org.apache.atlas.typesystem.types.Multiplicity; import org.apache.atlas.typesystem.types.ObjectGraphWalker; import org.apache.atlas.typesystem.types.TraitType; import org.apache.atlas.typesystem.types.TypeSystem; +import org.apache.atlas.typesystem.types.TypeUtils; import org.apache.atlas.utils.MD5Utils; -import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,36 +79,40 @@ public final class TypedInstanceToGraphMapper { this.graphToTypedInstanceMapper = graphToTypedInstanceMapper; } - String[] mapTypedInstanceToGraph(Operation operation, ITypedReferenceableInstance... typedInstances) + TypeUtils.Pair<List<String>, List<String>> mapTypedInstanceToGraph(Operation operation, ITypedReferenceableInstance... typedInstances) throws AtlasException { - List<String> guids = new ArrayList<>(); + + List<String> createdIds = new ArrayList<>(); + List<String> updatedIds = new ArrayList<>(); + for (ITypedReferenceableInstance typedInstance : typedInstances) { Collection<IReferenceableInstance> newInstances = walkClassInstances(typedInstance); - Pair<List<ITypedReferenceableInstance>, List<ITypedReferenceableInstance>> instancesPair = + TypeUtils.Pair<List<ITypedReferenceableInstance>, List<ITypedReferenceableInstance>> instancesPair = createVerticesAndDiscoverInstances(newInstances); switch (operation) { case CREATE: - addOrUpdateAttributesAndTraits(operation, instancesPair.getLeft()); - addFullTextProperty(instancesPair.getLeft()); + List<String> ids = addOrUpdateAttributesAndTraits(operation, instancesPair.left); + createdIds.addAll(ids); + addFullTextProperty(instancesPair.left); break; case UPDATE_FULL: case UPDATE_PARTIAL: - List<ITypedReferenceableInstance> instancesForUpdate = instancesPair.getLeft(); - instancesForUpdate.addAll(instancesPair.getRight()); - addOrUpdateAttributesAndTraits(operation, instancesForUpdate); - addFullTextProperty(instancesForUpdate); + ids = addOrUpdateAttributesAndTraits(Operation.CREATE, instancesPair.left); + createdIds.addAll(ids); + ids = addOrUpdateAttributesAndTraits(operation, instancesPair.right); + updatedIds.addAll(ids); + + addFullTextProperty(instancesPair.left); + addFullTextProperty(instancesPair.right); break; case DELETE: throw new UnsupportedOperationException("Not handled - " + operation); } - - //Return guid for - addToGuids(typedInstance, guids); } - return guids.toArray(new String[guids.size()]); + return TypeUtils.Pair.of(createdIds, updatedIds); } private Collection<IReferenceableInstance> walkClassInstances(ITypedReferenceableInstance typedInstance) @@ -126,18 +130,21 @@ public final class TypedInstanceToGraphMapper { return entityProcessor.getInstances(); } - private void addOrUpdateAttributesAndTraits(Operation operation, List<ITypedReferenceableInstance> instances) throws AtlasException { + private List<String> addOrUpdateAttributesAndTraits(Operation operation, List<ITypedReferenceableInstance> instances) throws AtlasException { + List<String> guids = new ArrayList<>(); for (ITypedReferenceableInstance instance : instances) { try { //new vertex, set all the properties - addOrUpdateAttributesAndTraits(operation, instance); + String guid = addOrUpdateAttributesAndTraits(operation, instance); + guids.add(guid); } catch (SchemaViolationException e) { throw new EntityExistsException(instance, e); } } + return guids; } - private void addOrUpdateAttributesAndTraits(Operation operation, ITypedReferenceableInstance typedInstance) + private String addOrUpdateAttributesAndTraits(Operation operation, ITypedReferenceableInstance typedInstance) throws AtlasException { LOG.debug("Adding/Updating typed instance {}", typedInstance.getTypeName()); @@ -158,6 +165,8 @@ public final class TypedInstanceToGraphMapper { //TODO - Handle Trait updates addTraits(typedInstance, instanceVertex, classType); } + + return getId(typedInstance)._getId(); } private void mapInstanceToVertex(ITypedInstance typedInstance, Vertex instanceVertex, @@ -215,14 +224,16 @@ public final class TypedInstanceToGraphMapper { } } - private Pair<List<ITypedReferenceableInstance>, List<ITypedReferenceableInstance>> createVerticesAndDiscoverInstances( + private TypeUtils.Pair<List<ITypedReferenceableInstance>, List<ITypedReferenceableInstance>> createVerticesAndDiscoverInstances( Collection<IReferenceableInstance> instances) throws AtlasException { List<ITypedReferenceableInstance> instancesToCreate = new ArrayList<>(); List<ITypedReferenceableInstance> instancesToUpdate = new ArrayList<>(); for (IReferenceableInstance instance : instances) { + ITypedReferenceableInstance newInstance; Id id = instance.getId(); + if (!idToVertexMap.containsKey(id)) { Vertex instanceVertex; if (id.isAssigned()) { // has a GUID @@ -231,7 +242,9 @@ public final class TypedInstanceToGraphMapper { throw new IllegalStateException( String.format("%s is not of type ITypedReferenceableInstance", instance)); } - instancesToUpdate.add((ITypedReferenceableInstance) instance); + newInstance = (ITypedReferenceableInstance) instance; + instancesToUpdate.add(newInstance); + } else { //Check if there is already an instance with the same unique attribute value ClassType classType = typeSystem.getDataType(ClassType.class, instance.getTypeName()); @@ -239,31 +252,28 @@ public final class TypedInstanceToGraphMapper { //no entity with the given unique attribute, create new if (instanceVertex == null) { - ITypedReferenceableInstance newInstance = classType.convert(instance, Multiplicity.REQUIRED); + newInstance = classType.convert(instance, Multiplicity.REQUIRED); instanceVertex = graphHelper.createVertexWithIdentity(newInstance, classType.getAllSuperTypeNames()); instancesToCreate.add(newInstance); //Map only unique attributes for cases of circular references mapInstanceToVertex(newInstance, instanceVertex, classType.fieldMapping().fields, true, Operation.CREATE); + } else { if (!(instance instanceof ReferenceableInstance)) { throw new IllegalStateException( String.format("%s is not of type ITypedReferenceableInstance", instance)); } - instancesToUpdate.add((ITypedReferenceableInstance) instance); + newInstance = (ITypedReferenceableInstance) instance; + instancesToUpdate.add(newInstance); } } + //Set the id in the new instance idToVertexMap.put(id, instanceVertex); } } - return Pair.of(instancesToCreate, instancesToUpdate); - } - - private void addToGuids(ITypedReferenceableInstance typedInstance, List<String> guids) { - Vertex instanceVertex = idToVertexMap.get(typedInstance.getId()); - String guid = instanceVertex.getProperty(Constants.GUID_PROPERTY_KEY); - guids.add(guid); + return TypeUtils.Pair.of(instancesToCreate, instancesToUpdate); } private void addFullTextProperty(List<ITypedReferenceableInstance> instances) throws AtlasException { @@ -275,7 +285,8 @@ public final class TypedInstanceToGraphMapper { } } - private void addTraits(ITypedReferenceableInstance typedInstance, Vertex instanceVertex, ClassType classType) throws AtlasException { + private void addTraits(ITypedReferenceableInstance typedInstance, Vertex instanceVertex, ClassType classType) + throws AtlasException { for (String traitName : typedInstance.getTraits()) { LOG.debug("mapping trait {}", traitName); GraphHelper.addProperty(instanceVertex, Constants.TRAIT_NAMES_PROPERTY_KEY, traitName); @@ -288,7 +299,8 @@ public final class TypedInstanceToGraphMapper { /******************************************** STRUCT **************************************************/ - private Pair<Vertex, Edge> updateStructVertex(ITypedStruct structInstance, Edge relEdge, Operation operation) throws AtlasException { + private TypeUtils.Pair<Vertex, Edge> updateStructVertex(ITypedStruct structInstance, Edge relEdge, + Operation operation) throws AtlasException { //Already existing vertex. Update Vertex structInstanceVertex = relEdge.getVertex(Direction.IN); @@ -303,10 +315,11 @@ public final class TypedInstanceToGraphMapper { mapInstanceToVertex(structInstance, structInstanceVertex, structInstance.fieldMapping().fields, false, operation); GraphHelper.setProperty(structInstanceVertex, SIGNATURE_HASH_PROPERTY_KEY, String.valueOf(newSignature)); } - return Pair.of(structInstanceVertex, relEdge); + return TypeUtils.Pair.of(structInstanceVertex, relEdge); } - private Pair<Vertex, Edge> addStructVertex(ITypedStruct structInstance, Vertex instanceVertex, AttributeInfo attributeInfo, String edgeLabel) throws AtlasException { + private TypeUtils.Pair<Vertex, Edge> addStructVertex(ITypedStruct structInstance, Vertex instanceVertex, + AttributeInfo attributeInfo, String edgeLabel) throws AtlasException { // add a new vertex for the struct or trait instance Vertex structInstanceVertex = graphHelper.createVertexWithoutIdentity(structInstance.getTypeName(), null, Collections.<String>emptySet()); // no super types for struct type @@ -317,7 +330,7 @@ public final class TypedInstanceToGraphMapper { // add an edge to the newly created vertex from the parent Edge relEdge = graphHelper.addEdge(instanceVertex, structInstanceVertex, edgeLabel); - return Pair.of(structInstanceVertex, relEdge); + return TypeUtils.Pair.of(structInstanceVertex, relEdge); } /******************************************** ARRAY **************************************************/ @@ -443,7 +456,7 @@ public final class TypedInstanceToGraphMapper { private String addOrUpdateStruct(Vertex instanceVertex, AttributeInfo attributeInfo, IDataType elementType, ITypedStruct structAttr, String curVal, String edgeLabel, Operation operation) throws AtlasException { - Pair<Vertex, Edge> vertexEdgePair = null; + TypeUtils.Pair<Vertex, Edge> vertexEdgePair = null; if (curVal != null && structAttr == null) { //remove edge removeUnusedReference(curVal, attributeInfo, elementType); @@ -456,7 +469,7 @@ public final class TypedInstanceToGraphMapper { vertexEdgePair = addStructVertex(structAttr, instanceVertex, attributeInfo, edgeLabel); } - return (vertexEdgePair != null) ? vertexEdgePair.getRight().getId().toString() : null; + return (vertexEdgePair != null) ? vertexEdgePair.right.getId().toString() : null; } private String addOrUpdateClassVertex(Vertex instanceVertex, AttributeInfo attributeInfo, IDataType elementType, @@ -468,27 +481,28 @@ public final class TypedInstanceToGraphMapper { throw new EntityNotFoundException("Could not find vertex for Class Reference " + newVal); } - Pair<Vertex, Edge> vertexEdgePair = null; + TypeUtils.Pair<Vertex, Edge> vertexEdgePair = null; if (curVal != null && newVal == null) { //remove edge removeUnusedReference(curVal, attributeInfo, elementType); } else if (curVal != null && newVal != null) { Edge edge = graphHelper.getOutGoingEdgeById(curVal); Id classRefId = getId(newVal); - vertexEdgePair = updateClassEdge(classRefId, newVal, instanceVertex, edge, toVertex, attributeInfo, elementType, edgeLabel, operation); + vertexEdgePair = updateClassEdge(classRefId, newVal, instanceVertex, edge, toVertex, attributeInfo, + elementType, edgeLabel, operation); } else if (newVal != null){ vertexEdgePair = addClassEdge(instanceVertex, toVertex, edgeLabel); } - return (vertexEdgePair != null) ? vertexEdgePair.getRight().getId().toString() : null; + return (vertexEdgePair != null) ? vertexEdgePair.right.getId().toString() : null; } /******************************************** CLASS **************************************************/ - private Pair<Vertex, Edge> addClassEdge(Vertex instanceVertex, Vertex toVertex, String edgeLabel) throws AtlasException { + private TypeUtils.Pair<Vertex, Edge> addClassEdge(Vertex instanceVertex, Vertex toVertex, String edgeLabel) throws AtlasException { // add an edge to the class vertex from the instance Edge edge = graphHelper.addEdge(instanceVertex, toVertex, edgeLabel); - return Pair.of(toVertex, edge); + return TypeUtils.Pair.of(toVertex, edge); } private Vertex getClassVertex(ITypedReferenceableInstance typedReference) throws EntityNotFoundException { @@ -521,11 +535,11 @@ public final class TypedInstanceToGraphMapper { } - private Pair<Vertex, Edge> updateClassEdge(Id id, final ITypedReferenceableInstance typedInstance, + private TypeUtils.Pair<Vertex, Edge> updateClassEdge(Id id, final ITypedReferenceableInstance typedInstance, Vertex instanceVertex, Edge edge, Vertex toVertex, AttributeInfo attributeInfo, IDataType dataType, String edgeLabel, Operation operation) throws AtlasException { - Pair<Vertex, Edge> result = Pair.of(toVertex, edge); + TypeUtils.Pair<Vertex, Edge> result = TypeUtils.Pair.of(toVertex, edge); Edge newEdge = edge; // Update edge if it exists Vertex invertex = edge.getVertex(Direction.IN); @@ -535,7 +549,7 @@ public final class TypedInstanceToGraphMapper { // add an edge to the class vertex from the instance if(toVertex != null) { newEdge = graphHelper.addEdge(instanceVertex, toVertex, edgeLabel); - result = Pair.of(toVertex, newEdge); + result = TypeUtils.Pair.of(toVertex, newEdge); } removeUnusedReference(edge.getId().toString(), attributeInfo, dataType); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e1f05fa7/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java index f605c26..b38face 100755 --- a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java +++ b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java @@ -23,16 +23,12 @@ import com.google.common.collect.ImmutableList; import com.google.inject.Provider; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasException; -import org.apache.atlas.repository.RepositoryException; -import org.apache.atlas.typesystem.exception.EntityNotFoundException; -import org.apache.atlas.typesystem.exception.TypeNotFoundException; -import org.apache.atlas.typesystem.persistence.ReferenceableInstance; -import org.apache.atlas.utils.ParamChecker; import org.apache.atlas.classification.InterfaceAudience; import org.apache.atlas.listener.EntityChangeListener; import org.apache.atlas.listener.TypesChangeListener; import org.apache.atlas.repository.IndexCreationException; import org.apache.atlas.repository.MetadataRepository; +import org.apache.atlas.repository.RepositoryException; import org.apache.atlas.repository.typestore.ITypeStore; import org.apache.atlas.typesystem.IStruct; import org.apache.atlas.typesystem.ITypedReferenceableInstance; @@ -40,9 +36,12 @@ import org.apache.atlas.typesystem.ITypedStruct; import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Struct; import org.apache.atlas.typesystem.TypesDef; +import org.apache.atlas.typesystem.exception.EntityNotFoundException; +import org.apache.atlas.typesystem.exception.TypeNotFoundException; import org.apache.atlas.typesystem.json.InstanceSerialization; import org.apache.atlas.typesystem.json.TypesSerialization; import org.apache.atlas.typesystem.persistence.Id; +import org.apache.atlas.typesystem.persistence.ReferenceableInstance; import org.apache.atlas.typesystem.types.AttributeDefinition; import org.apache.atlas.typesystem.types.AttributeInfo; import org.apache.atlas.typesystem.types.ClassType; @@ -54,25 +53,24 @@ import org.apache.atlas.typesystem.types.Multiplicity; import org.apache.atlas.typesystem.types.StructTypeDefinition; import org.apache.atlas.typesystem.types.TraitType; import org.apache.atlas.typesystem.types.TypeSystem; +import org.apache.atlas.typesystem.types.TypeUtils; import org.apache.atlas.typesystem.types.ValueConversionException; import org.apache.atlas.typesystem.types.utils.TypesUtil; +import org.apache.atlas.utils.ParamChecker; import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.actors.threadpool.Arrays; import javax.inject.Inject; import javax.inject.Singleton; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; -import java.util.Set; /** * Simple wrapper over TypeSystem and MetadataRepository services with hooks @@ -279,17 +277,10 @@ public class DefaultMetadataService implements MetadataService { ITypedReferenceableInstance[] typedInstances = deserializeClassInstances(entityInstanceDefinition); - final String[] guids = repository.createEntities(typedInstances); - - Set<ITypedReferenceableInstance> entitites = new HashSet<>(); + final List<String> guids = repository.createEntities(typedInstances); - for (String guid : guids) { - entitites.add(repository.getEntityDefinition(guid)); - } - - onEntitiesAdded(entitites); - - return new JSONArray(Arrays.asList(guids)).toString(); + onEntitiesAdded(guids); + return new JSONArray(guids).toString(); } private ITypedReferenceableInstance[] deserializeClassInstances(String entityInstanceDefinition) @@ -390,14 +381,20 @@ public class DefaultMetadataService implements MetadataService { ParamChecker.notEmpty(entityInstanceDefinition, "Entity instance definition cannot be empty"); ITypedReferenceableInstance[] typedInstances = deserializeClassInstances(entityInstanceDefinition); - String[] guids = repository.updateEntities(typedInstances); - onEntitiesAdded(Arrays.asList(typedInstances)); + TypeUtils.Pair<List<String>, List<String>> guids = repository.updateEntities(typedInstances); + return onEntitiesAddedUpdated(guids); + } + + private String onEntitiesAddedUpdated(TypeUtils.Pair<List<String>, List<String>> guids) throws AtlasException { + onEntitiesAdded(guids.left); + onEntitiesUpdated(guids.right); - return new JSONArray(Arrays.asList(guids)).toString(); + guids.left.addAll(guids.right); + return new JSONArray(guids.left).toString(); } @Override - public void updateEntityAttributeByGuid(final String guid, String attributeName, String value) throws AtlasException { + public String updateEntityAttributeByGuid(final String guid, String attributeName, String value) throws AtlasException { ParamChecker.notEmpty(guid, "guid cannot be null"); ParamChecker.notEmpty(attributeName, "property cannot be null"); ParamChecker.notEmpty(value, "property value cannot be null"); @@ -426,10 +423,8 @@ public class DefaultMetadataService implements MetadataService { } ((ReferenceableInstance)newInstance).replaceWithNewId(new Id(guid, 0, newInstance.getTypeName())); - repository.updatePartial(newInstance); - onEntitiesUpdated(new ArrayList<ITypedReferenceableInstance>() {{ - add(repository.getEntityDefinition(guid)); - }}); + TypeUtils.Pair<List<String>, List<String>> guids = repository.updatePartial(newInstance); + return onEntitiesAddedUpdated(guids); } private ITypedReferenceableInstance validateEntityExists(String guid) @@ -442,7 +437,7 @@ public class DefaultMetadataService implements MetadataService { } @Override - public void updateEntityPartialByGuid(final String guid, Referenceable newEntity) throws AtlasException { + public String updateEntityPartialByGuid(final String guid, Referenceable newEntity) throws AtlasException { ParamChecker.notEmpty(guid, "guid cannot be null"); ParamChecker.notNull(newEntity, "updatedEntity cannot be null"); ITypedReferenceableInstance existInstance = validateEntityExists(guid); @@ -450,10 +445,8 @@ public class DefaultMetadataService implements MetadataService { ITypedReferenceableInstance newInstance = convertToTypedInstance(newEntity, existInstance.getTypeName()); ((ReferenceableInstance)newInstance).replaceWithNewId(new Id(guid, 0, newInstance.getTypeName())); - repository.updatePartial(newInstance); - onEntitiesUpdated(new ArrayList<ITypedReferenceableInstance>() {{ - add(repository.getEntityDefinition(guid)); - }}); + TypeUtils.Pair<List<String>, List<String>> guids = repository.updatePartial(newInstance); + return onEntitiesAddedUpdated(guids); } private ITypedReferenceableInstance convertToTypedInstance(Referenceable updatedEntity, String typeName) throws AtlasException { @@ -511,13 +504,8 @@ public class DefaultMetadataService implements MetadataService { final ITypedReferenceableInstance newInstance = convertToTypedInstance(updatedEntity, typeName); ((ReferenceableInstance)newInstance).replaceWithNewId(oldInstance.getId()); - repository.updatePartial(newInstance); - - onEntitiesUpdated(new ArrayList<ITypedReferenceableInstance>() {{ - add(newInstance); - }}); - - return newInstance.getId()._getId(); + TypeUtils.Pair<List<String>, List<String>> guids = repository.updatePartial(newInstance); + return onEntitiesAddedUpdated(guids); } private void validateTypeExists(String entityType) throws AtlasException { @@ -633,12 +621,22 @@ public class DefaultMetadataService implements MetadataService { } } - private void onEntitiesAdded(Collection<ITypedReferenceableInstance> entities) throws AtlasException { + private void onEntitiesAdded(List<String> guids) throws AtlasException { + List<ITypedReferenceableInstance> entities = loadEntities(guids); for (EntityChangeListener listener : entityChangeListeners) { listener.onEntitiesAdded(entities); } } + private List<ITypedReferenceableInstance> loadEntities(List<String> guids) throws EntityNotFoundException, + RepositoryException { + List<ITypedReferenceableInstance> entities = new ArrayList<>(); + for (String guid : guids) { + entities.add(repository.getEntityDefinition(guid)); + } + return entities; + } + private void onTypesUpdated(Map<String, IDataType> typesUpdated) throws AtlasException { Map<TypesChangeListener, Throwable> caughtExceptions = new HashMap<>(); for (Provider<TypesChangeListener> indexerProvider : typeChangeListeners) { @@ -656,8 +654,8 @@ public class DefaultMetadataService implements MetadataService { } } - private void onEntitiesUpdated(Collection<ITypedReferenceableInstance> entities) - throws AtlasException { + private void onEntitiesUpdated(List<String> guids) throws AtlasException { + List<ITypedReferenceableInstance> entities = loadEntities(guids); for (EntityChangeListener listener : entityChangeListeners) { listener.onEntitiesUpdated(entities); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e1f05fa7/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java b/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java index 1075d85..a49967f 100644 --- a/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java +++ b/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java @@ -346,13 +346,10 @@ public class BaseHiveRepositoryTest { return createInstance(referenceable, clsType); } private Id createInstance(Referenceable referenceable, ClassType clsType) throws Exception { -// String entityJSON = InstanceSerialization.toJson(referenceable, true); - - ITypedReferenceableInstance typedInstance = clsType.convert(referenceable, Multiplicity.REQUIRED); - String guid = repository.createEntities(typedInstance)[0]; + List<String> guids = repository.createEntities(typedInstance); // return the reference to created instance with guid - return new Id(guid, 0, referenceable.getTypeName()); + return new Id(guids.get(guids.size() - 1), 0, referenceable.getTypeName()); } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e1f05fa7/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java b/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java index 78af443..22ff1d6 100755 --- a/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java @@ -118,10 +118,10 @@ public class GraphBackedMetadataRepositoryTest { ClassType deptType = typeSystem.getDataType(ClassType.class, "Department"); ITypedReferenceableInstance hrDept2 = deptType.convert(hrDept, Multiplicity.REQUIRED); - String[] guids = repositoryService.createEntities(hrDept2); + List<String> guids = repositoryService.createEntities(hrDept2); Assert.assertNotNull(guids); - Assert.assertEquals(guids.length, 1); - guid = guids[0]; + Assert.assertEquals(guids.size(), 5); + guid = guids.get(4); Assert.assertNotNull(guid); } @@ -173,14 +173,12 @@ public class GraphBackedMetadataRepositoryTest { ITypedReferenceableInstance db = dbType.convert(databaseInstance, Multiplicity.REQUIRED); System.out.println("db = " + db); - String dbGUID = repositoryService.createEntities(db)[0]; - System.out.println("added db = " + dbGUID); - - Referenceable dbInstance = new Referenceable(dbGUID, TestUtils.DATABASE_TYPE, databaseInstance.getValuesMap()); - - ITypedReferenceableInstance table = createHiveTableInstance(dbInstance); - String tableGUID = repositoryService.createEntities(table)[0]; - System.out.println("added table = " + tableGUID); + //Reuse the same database instance without id, with the same unique attribute + ITypedReferenceableInstance table = createHiveTableInstance(databaseInstance); + List<String> guids = repositoryService.createEntities(db, table); + Assert.assertEquals(guids.size(), 7); //1 db + 5 columns + 1 table. Shouldn't create db again + System.out.println("added db = " + guids.get(0)); + System.out.println("added table = " + guids.get(6)); } @Test(dependsOnMethods = "testCreateEntity") @@ -600,9 +598,10 @@ public class GraphBackedMetadataRepositoryTest { ClassType deptType = typeSystem.getDataType(ClassType.class, "Department"); ITypedReferenceableInstance hrDept2 = deptType.convert(hrDept, Multiplicity.REQUIRED); - String[] guids = repositoryService.createEntities(hrDept2); + List<String> guids = repositoryService.createEntities(hrDept2); Assert.assertNotNull(guids); - Assert.assertEquals(guids.length, 1); - Assert.assertNotNull(guids[0]); + Assert.assertEquals(guids.size(), 2); + Assert.assertNotNull(guids.get(0)); + Assert.assertNotNull(guids.get(1)); } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e1f05fa7/repository/src/test/java/org/apache/atlas/repository/graph/GraphRepoMapperScaleTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/graph/GraphRepoMapperScaleTest.java b/repository/src/test/java/org/apache/atlas/repository/graph/GraphRepoMapperScaleTest.java index c25ccf8..2d01bbe 100755 --- a/repository/src/test/java/org/apache/atlas/repository/graph/GraphRepoMapperScaleTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/graph/GraphRepoMapperScaleTest.java @@ -101,7 +101,7 @@ public class GraphRepoMapperScaleTest { ClassType dbType = typeSystem.getDataType(ClassType.class, TestUtils.DATABASE_TYPE); ITypedReferenceableInstance db = dbType.convert(databaseInstance, Multiplicity.REQUIRED); - dbGUID = repositoryService.createEntities(db)[0]; + dbGUID = repositoryService.createEntities(db).get(0); Referenceable dbInstance = new Referenceable(dbGUID, TestUtils.DATABASE_TYPE, databaseInstance.getValuesMap()); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e1f05fa7/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java b/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java index 0307fd4..8a80d8c 100644 --- a/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java +++ b/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java @@ -54,6 +54,7 @@ import java.util.List; import java.util.Map; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; @Guice(modules = RepositoryMetadataModule.class) @@ -113,7 +114,11 @@ public class DefaultMetadataServiceTest { JSONArray entitiesJson = new JSONArray(); entitiesJson.put(entityjson); String response = metadataService.createEntities(entitiesJson.toString()); - return new JSONArray(response).getString(0); + JSONArray guids = new JSONArray(response); + if (guids != null && guids.length() > 0) { + return guids.getString(0); + } + return null; } private String updateInstance(Referenceable entity) throws Exception { @@ -154,7 +159,7 @@ public class DefaultMetadataServiceTest { //using the same name should succeed, but not create another entity String newId = createInstance(entity); - Assert.assertEquals(newId, id); + assertNull(newId); //Same entity, but different qualified name should succeed entity.set("name", TestUtils.randomString()); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e1f05fa7/server-api/src/main/java/org/apache/atlas/services/MetadataService.java ---------------------------------------------------------------------- diff --git a/server-api/src/main/java/org/apache/atlas/services/MetadataService.java b/server-api/src/main/java/org/apache/atlas/services/MetadataService.java index 0cfed2e..a91c0a0 100644 --- a/server-api/src/main/java/org/apache/atlas/services/MetadataService.java +++ b/server-api/src/main/java/org/apache/atlas/services/MetadataService.java @@ -74,7 +74,7 @@ public interface MetadataService { * Creates an entity, instance of the type. * * @param entityDefinition definition - * @return guid + * @return json array of guids of entities created */ String createEntities(String entityDefinition) throws AtlasException; @@ -107,25 +107,28 @@ public interface MetadataService { /** * Adds the property to the given entity id(guid). * Currently supports updates only on PRIMITIVE, CLASS attribute types - * - * @param guid entity id + * @param guid entity id * @param attribute property name * @param value property value + * @return json array of guids of entities created/updated */ - void updateEntityAttributeByGuid(String guid, String attribute, String value) throws AtlasException; + String updateEntityAttributeByGuid(String guid, String attribute, String value) throws AtlasException; /** * Supports Partial updates of an entity. Users can update a subset of attributes for an entity identified by its guid * Note however that it cannot be used to set attribute values to null or delete attrbute values - * + * @param guid entity id + * @param entity + * @return json array of guids of entities created/updated + * @throws AtlasException */ - void updateEntityPartialByGuid(String guid, Referenceable entity) throws AtlasException; + String updateEntityPartialByGuid(String guid, Referenceable entity) throws AtlasException; /** * Batch API - Adds/Updates the given entity id(guid). * * @param entityJson entity json - * @return List of guids which were updated and ones which were newly created as part of the updated entity + * @return json array of guids of entities created/updated */ String updateEntities(String entityJson) throws AtlasException; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e1f05fa7/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeUtils.java ---------------------------------------------------------------------- diff --git a/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeUtils.java b/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeUtils.java index 9d2480b..f5c2ce9 100755 --- a/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeUtils.java +++ b/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeUtils.java @@ -70,14 +70,18 @@ public class TypeUtils { return b; } - protected static class Pair<L, R> { - protected L left; - protected R right; + public static class Pair<L, R> { + public L left; + public R right; public Pair(L left, R right) { this.left = left; this.right = right; } + + public static <L, R> Pair<L, R> of(L left, R right) { + return new Pair<>(left, right); + } } /** http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e1f05fa7/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java index 2ee0027..f5ab4d8 100755 --- a/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java @@ -118,8 +118,12 @@ public class EntityResource { JSONObject response = new JSONObject(); response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId()); - response.put(AtlasClient.GUID, new JSONArray(guids)); - response.put(AtlasClient.DEFINITION, new JSONObject(metadataService.getEntityDefinition(new JSONArray(guids).getString(0)))); + JSONArray guidArray = new JSONArray(guids); + response.put(AtlasClient.GUID, guidArray); + if (guidArray.length() > 0) { + response.put(AtlasClient.DEFINITION, + new JSONObject(metadataService.getEntityDefinition(new JSONArray(guids).getString(0)))); + } return Response.created(locationURI).entity(response).build(); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e1f05fa7/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java b/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java index 4d2cce7..b2501ec 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java +++ b/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java @@ -58,7 +58,6 @@ public class EntityNotificationIT extends BaseResourceIT { private static final String ENTITIES = "api/atlas/entities"; private static final String TRAITS = "traits"; - private static final int MAX_WAIT_TIME = 10000; private final String DATABASE_NAME = "db" + randomString(); private final String TABLE_NAME = "table" + randomString(); @Inject @@ -98,7 +97,7 @@ public class EntityNotificationIT extends BaseResourceIT { final String guid = tableId._getId(); - waitForNotification(MAX_WAIT_TIME); + waitForNotification(notificationConsumer, MAX_WAIT_TIME); EntityNotification entityNotification = notificationConsumer.getLastEntityNotification(); @@ -120,7 +119,7 @@ public class EntityNotificationIT extends BaseResourceIT { serviceClient.updateEntityAttribute(guid, property, newValue); - waitForNotification(MAX_WAIT_TIME); + waitForNotification(notificationConsumer, MAX_WAIT_TIME); EntityNotification entityNotification = notificationConsumer.getLastEntityNotification(); @@ -155,7 +154,7 @@ public class EntityNotificationIT extends BaseResourceIT { ClientResponse clientResponse = addTrait(guid, traitInstanceJSON); assertEquals(clientResponse.getStatus(), Response.Status.CREATED.getStatusCode()); - waitForNotification(MAX_WAIT_TIME); + waitForNotification(notificationConsumer, MAX_WAIT_TIME); EntityNotification entityNotification = notificationConsumer.getLastEntityNotification(); @@ -192,7 +191,7 @@ public class EntityNotificationIT extends BaseResourceIT { clientResponse = addTrait(guid, traitInstanceJSON); assertEquals(clientResponse.getStatus(), Response.Status.CREATED.getStatusCode()); - waitForNotification(MAX_WAIT_TIME); + waitForNotification(notificationConsumer, MAX_WAIT_TIME); entityNotification = notificationConsumer.getLastEntityNotification(); @@ -218,7 +217,7 @@ public class EntityNotificationIT extends BaseResourceIT { ClientResponse clientResponse = deleteTrait(guid, traitName); Assert.assertEquals(clientResponse.getStatus(), Response.Status.OK.getStatusCode()); - waitForNotification(MAX_WAIT_TIME); + waitForNotification(notificationConsumer, MAX_WAIT_TIME); EntityNotification entityNotification = notificationConsumer.getLastEntityNotification(); @@ -260,51 +259,4 @@ public class EntityNotificationIT extends BaseResourceIT { return resource.accept(Servlets.JSON_MEDIA_TYPE).type(Servlets.JSON_MEDIA_TYPE) .method(HttpMethod.DELETE, ClientResponse.class); } - - private void waitForNotification(int maxWait) throws Exception { - waitFor(maxWait, new Predicate() { - @Override - public boolean evaluate() throws Exception { - return notificationConsumer.getLastEntityNotification() != null; - } - }); - } - - - // ----- inner class : EntityNotificationConsumer -------------------------- - - private static class EntityNotificationConsumer implements Runnable { - private final NotificationConsumer<EntityNotification> consumerIterator; - private EntityNotification entityNotification = null; - private boolean run; - - public EntityNotificationConsumer(NotificationConsumer<EntityNotification> consumerIterator) { - this.consumerIterator = consumerIterator; - } - - @Override - public void run() { - while (run && consumerIterator.hasNext()) { - entityNotification = consumerIterator.next(); - } - } - - public void reset() { - entityNotification = null; - } - - public void start() { - Thread thread = new Thread(this); - run = true; - thread.start(); - } - - public void stop() { - run = false; - } - - public EntityNotification getLastEntityNotification() { - return entityNotification; - } - } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e1f05fa7/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java index 361cece..aba191c 100755 --- a/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java +++ b/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java @@ -25,6 +25,8 @@ import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.WebResource; import com.sun.jersey.api.client.config.DefaultClientConfig; import org.apache.atlas.*; +import org.apache.atlas.notification.NotificationConsumer; +import org.apache.atlas.notification.entity.EntityNotification; import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Struct; import org.apache.atlas.typesystem.TypesDef; @@ -68,6 +70,7 @@ public abstract class BaseResourceIT { protected WebResource service; protected AtlasClient serviceClient; public static final Logger LOG = LoggerFactory.getLogger(BaseResourceIT.class); + protected static final int MAX_WAIT_TIME = 1000; @BeforeClass public void setUp() throws Exception { @@ -119,7 +122,10 @@ public abstract class BaseResourceIT { System.out.println("created instance for type " + typeName + ", guid: " + guids); // return the reference to created instance with guid - return new Id(guids.getString(0), 0, referenceable.getTypeName()); + if (guids.length() > 0) { + return new Id(guids.getString(guids.length() - 1), 0, referenceable.getTypeName()); + } + return null; } protected static final String DATABASE_TYPE = "hive_db"; @@ -285,4 +291,50 @@ public abstract class BaseResourceIT { throw new Exception("Waiting timed out after " + timeout + " msec"); } } + + // ----- inner class : EntityNotificationConsumer -------------------------- + + protected static class EntityNotificationConsumer implements Runnable { + private final NotificationConsumer<EntityNotification> consumerIterator; + private EntityNotification entityNotification = null; + private boolean run; + + public EntityNotificationConsumer(NotificationConsumer<EntityNotification> consumerIterator) { + this.consumerIterator = consumerIterator; + } + + @Override + public void run() { + while (run && consumerIterator.hasNext()) { + entityNotification = consumerIterator.next(); + } + } + + public void reset() { + entityNotification = null; + } + + public void start() { + Thread thread = new Thread(this); + run = true; + thread.start(); + } + + public void stop() { + run = false; + } + + public EntityNotification getLastEntityNotification() { + return entityNotification; + } + } + + protected void waitForNotification(final EntityNotificationConsumer notificationConsumer, int maxWait) throws Exception { + waitFor(maxWait, new Predicate() { + @Override + public boolean evaluate() throws Exception { + return notificationConsumer.getLastEntityNotification() != null; + } + }); + } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/e1f05fa7/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java index f476af3..a268196 100755 --- a/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java +++ b/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java @@ -19,10 +19,15 @@ package org.apache.atlas.web.resources; import com.google.common.collect.ImmutableList; +import com.google.inject.Inject; import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.WebResource; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.notification.NotificationConsumer; +import org.apache.atlas.notification.NotificationInterface; +import org.apache.atlas.notification.NotificationModule; +import org.apache.atlas.notification.entity.EntityNotification; import org.apache.atlas.typesystem.IStruct; import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Struct; @@ -43,11 +48,14 @@ import org.apache.atlas.web.util.Servlets; import org.apache.commons.lang.RandomStringUtils; import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONObject; +import org.junit.AfterClass; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; +import org.testng.annotations.Guice; import org.testng.annotations.Test; import javax.ws.rs.HttpMethod; @@ -59,10 +67,13 @@ import java.util.Map; import java.util.UUID; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.fail; /** * Integration tests for Entity Jersey Resource. */ +@Guice(modules = NotificationModule.class) public class EntityJerseyResourceIT extends BaseResourceIT { private static final Logger LOG = LoggerFactory.getLogger(EntityJerseyResourceIT.class); @@ -76,11 +87,32 @@ public class EntityJerseyResourceIT extends BaseResourceIT { private Id tableId; private String traitName; + @Inject + private NotificationInterface notificationInterface; + private EntityNotificationConsumer notificationConsumer; + @BeforeClass public void setUp() throws Exception { super.setUp(); createTypeDefinitions(); + + List<NotificationConsumer<EntityNotification>> consumers = + notificationInterface.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1); + + NotificationConsumer<EntityNotification> consumer = consumers.iterator().next(); + notificationConsumer = new EntityNotificationConsumer(consumer); + notificationConsumer.start(); + } + + @AfterClass + public void tearDown() { + notificationConsumer.stop(); + } + + @BeforeMethod + public void setupTest() { + notificationConsumer.reset(); } @Test @@ -119,18 +151,32 @@ public class EntityJerseyResourceIT extends BaseResourceIT { @Test public void testEntityDeduping() throws Exception { - Referenceable db = new Referenceable(DATABASE_TYPE); - String dbName = "db" + randomString(); + final Referenceable db = new Referenceable(DATABASE_TYPE); + final String dbName = "db" + randomString(); db.set("name", dbName); db.set("description", randomString()); - serviceClient.createEntity(db); + serviceClient.createEntity(db).getString(0); + + waitForNotification(notificationConsumer, MAX_WAIT_TIME); + EntityNotification notification = notificationConsumer.getLastEntityNotification(); + assertNotNull(notification); + assertEquals(notification.getEntity().get("name"), dbName); + JSONArray results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName)); assertEquals(results.length(), 1); //create entity again shouldn't create another instance with same unique attribute value + notificationConsumer.reset(); serviceClient.createEntity(db); + try { + waitForNotification(notificationConsumer, MAX_WAIT_TIME); + fail("Expected time out exception"); + } catch (Exception e) { + //expected timeout + } + results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName)); assertEquals(results.length(), 1);