Repository: incubator-atlas Updated Branches: refs/heads/master 48c10133e -> 0feb60a2f
ATLAS 1607: notify listeners on classification addition/deletion Signed-off-by: Madhan Neethiraj <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/0feb60a2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/0feb60a2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/0feb60a2 Branch: refs/heads/master Commit: 0feb60a2f1e70fbc32f765fc67ffeaad6618c997 Parents: 48c1013 Author: Sarath Subramanian <[email protected]> Authored: Tue Feb 28 18:35:30 2017 -0800 Committer: Madhan Neethiraj <[email protected]> Committed: Tue Feb 28 22:42:27 2017 -0800 ---------------------------------------------------------------------- .../repository/audit/EntityAuditListener.java | 22 ++++--- .../graph/v1/AtlasEntityChangeNotifier.java | 62 ++++++++++++++++++++ .../store/graph/v1/AtlasEntityStoreV1.java | 12 +++- .../atlas/services/DefaultMetadataService.java | 14 ++--- .../service/DefaultMetadataServiceTest.java | 4 +- .../atlas/listener/EntityChangeListener.java | 8 +-- .../NotificationEntityChangeListener.java | 4 +- 7 files changed, 103 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0feb60a2/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java index 01c077a..3f03c50 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java @@ -77,18 +77,26 @@ public class EntityAuditListener implements EntityChangeListener { } @Override - public void onTraitAdded(ITypedReferenceableInstance entity, IStruct trait) throws AtlasException { - EntityAuditEvent event = createEvent(entity, EntityAuditAction.TAG_ADD, - "Added trait: " + InstanceSerialization.toJson(trait, true)); + public void onTraitsAdded(ITypedReferenceableInstance entity, Collection<? extends IStruct> traits) throws AtlasException { + if (traits != null) { + for (IStruct trait : traits) { + EntityAuditEvent event = createEvent(entity, EntityAuditAction.TAG_ADD, + "Added trait: " + InstanceSerialization.toJson(trait, true)); - auditRepository.putEvents(event); + auditRepository.putEvents(event); + } + } } @Override - public void onTraitDeleted(ITypedReferenceableInstance entity, String traitName) throws AtlasException { - EntityAuditEvent event = createEvent(entity, EntityAuditAction.TAG_DELETE, "Deleted trait: " + traitName); + public void onTraitsDeleted(ITypedReferenceableInstance entity, Collection<String> traitNames) throws AtlasException { + if (traitNames != null) { + for (String traitName : traitNames) { + EntityAuditEvent event = createEvent(entity, EntityAuditAction.TAG_DELETE, "Deleted trait: " + traitName); - auditRepository.putEvents(event); + auditRepository.putEvents(event); + } + } } @Override http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0feb60a2/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java index 4ec2a7c..e112b64 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java @@ -24,6 +24,7 @@ import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasException; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.listener.EntityChangeListener; +import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.instance.EntityMutations.EntityOperation; @@ -32,8 +33,10 @@ import org.apache.atlas.repository.converters.AtlasInstanceConverter; import org.apache.atlas.repository.graph.*; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.typesystem.ITypedReferenceableInstance; +import org.apache.atlas.typesystem.ITypedStruct; import org.apache.atlas.util.AtlasRepositoryConfiguration; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -105,6 +108,39 @@ public class AtlasEntityChangeNotifier { } } + public void onClassificationAddedToEntity(String entityId, List<AtlasClassification> classifications) throws AtlasBaseException { + ITypedReferenceableInstance entity = toITypedReferenceable(entityId); + List<ITypedStruct> traits = toITypedStructs(classifications); + + if (entity == null || CollectionUtils.isEmpty(traits)) { + return; + } + + for (EntityChangeListener listener : entityChangeListeners) { + try { + listener.onTraitsAdded(entity, traits); + } catch (AtlasException e) { + throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e); + } + } + } + + public void onClassificationDeletedFromEntity(String entityId, List<String> traitNames) throws AtlasBaseException { + ITypedReferenceableInstance entity = toITypedReferenceable(entityId); + + if (entity == null || CollectionUtils.isEmpty(traitNames)) { + return; + } + + for (EntityChangeListener listener : entityChangeListeners) { + try { + listener.onTraitsDeleted(entity, traitNames); + } catch (AtlasException e) { + throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e); + } + } + } + private void notifyListeners(List<ITypedReferenceableInstance> typedRefInsts, EntityOperation operation) throws AtlasBaseException { for (EntityChangeListener listener : entityChangeListeners) { try { @@ -136,6 +172,32 @@ public class AtlasEntityChangeNotifier { return ret; } + private ITypedReferenceableInstance toITypedReferenceable(String entityId) throws AtlasBaseException { + ITypedReferenceableInstance ret = null; + + if (StringUtils.isNotEmpty(entityId)) { + ret = instanceConverter.getITypedReferenceable(entityId); + } + + return ret; + } + + private List<ITypedStruct> toITypedStructs(List<AtlasClassification> classifications) throws AtlasBaseException { + List<ITypedStruct> ret = null; + + if (classifications != null) { + ret = new ArrayList<>(classifications.size()); + + for (AtlasClassification classification : classifications) { + if (classification != null) { + ret.add(instanceConverter.getTrait(classification)); + } + } + } + + return ret; + } + private void doFullTextMapping(List<AtlasEntityHeader> atlasEntityHeaders) { try { if(!AtlasRepositoryConfiguration.isFullTextSearchEnabled()) { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0feb60a2/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java index af1066d..a3d951d 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java @@ -430,6 +430,8 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { EntityGraphMapper graphMapper = new EntityGraphMapper(deleteHandler, typeRegistry); graphMapper.addClassifications(new EntityMutationContext(), guid, classifications); + // notify listeners on classification addition + entityChangeNotifier.onClassificationAddedToEntity(guid, classifications); } @Override @@ -448,8 +450,13 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { EntityGraphMapper graphMapper = new EntityGraphMapper(deleteHandler, typeRegistry); + List<AtlasClassification> classifications = Collections.singletonList(classification); + for (String guid : guids) { - graphMapper.addClassifications(new EntityMutationContext(), guid, Collections.singletonList(classification)); + graphMapper.addClassifications(new EntityMutationContext(), guid, classifications); + + // notify listeners on classification addition + entityChangeNotifier.onClassificationAddedToEntity(guid, classifications); } } @@ -470,6 +477,9 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { EntityGraphMapper entityGraphMapper = new EntityGraphMapper(deleteHandler, typeRegistry); entityGraphMapper.deleteClassifications(guid, classificationNames); + + // notify listeners on classification deletion + entityChangeNotifier.onClassificationDeletedFromEntity(guid, classificationNames); } @Override http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0feb60a2/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java index 993cf61..77db167 100755 --- a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java +++ b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java @@ -72,11 +72,7 @@ import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Collection; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; +import java.util.*; import javax.inject.Inject; import javax.inject.Singleton; @@ -724,14 +720,18 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang } private void onTraitAddedToEntity(ITypedReferenceableInstance entity, IStruct trait) throws AtlasException { + Collection<IStruct> traits = Collections.singletonList(trait); + for (EntityChangeListener listener : entityChangeListeners) { - listener.onTraitAdded(entity, trait); + listener.onTraitsAdded(entity, traits); } } private void onTraitDeletedFromEntity(ITypedReferenceableInstance entity, String traitName) throws AtlasException { + Collection<String> traitNames = Collections.singletonList(traitName); + for (EntityChangeListener listener : entityChangeListeners) { - listener.onTraitDeleted(entity, traitName); + listener.onTraitsDeleted(entity, traitNames); } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0feb60a2/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java b/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java index 96c2ce5..7e828a1 100644 --- a/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java +++ b/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java @@ -1258,12 +1258,12 @@ public class DefaultMetadataServiceTest { } @Override - public void onTraitAdded(ITypedReferenceableInstance entity, IStruct trait) + public void onTraitsAdded(ITypedReferenceableInstance entity, Collection<? extends IStruct> traits) throws AtlasException { } @Override - public void onTraitDeleted(ITypedReferenceableInstance entity, String traitName) + public void onTraitsDeleted(ITypedReferenceableInstance entity, Collection<String> traitNames) throws AtlasException { } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0feb60a2/server-api/src/main/java/org/apache/atlas/listener/EntityChangeListener.java ---------------------------------------------------------------------- diff --git a/server-api/src/main/java/org/apache/atlas/listener/EntityChangeListener.java b/server-api/src/main/java/org/apache/atlas/listener/EntityChangeListener.java index e9a7d1a..256e839 100644 --- a/server-api/src/main/java/org/apache/atlas/listener/EntityChangeListener.java +++ b/server-api/src/main/java/org/apache/atlas/listener/EntityChangeListener.java @@ -52,21 +52,21 @@ public interface EntityChangeListener { * This is upon adding a new trait to a typed instance. * * @param entity the entity - * @param trait trait that needs to be added to entity + * @param traits trait that needs to be added to entity * * @throws AtlasException if the listener notification fails */ - void onTraitAdded(ITypedReferenceableInstance entity, IStruct trait) throws AtlasException; + void onTraitsAdded(ITypedReferenceableInstance entity, Collection<? extends IStruct> traits) throws AtlasException; /** * This is upon deleting a trait from a typed instance. * * @param entity the entity - * @param traitName trait name for the instance that needs to be deleted from entity + * @param traitNames trait name for the instance that needs to be deleted from entity * * @throws AtlasException if the listener notification fails */ - void onTraitDeleted(ITypedReferenceableInstance entity, String traitName) throws AtlasException; + void onTraitsDeleted(ITypedReferenceableInstance entity, Collection<String> traitNames) throws AtlasException; /** * This is upon deleting entities from the repository. http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0feb60a2/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java index 8a1991c..978b21d 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java @@ -87,12 +87,12 @@ public class NotificationEntityChangeListener implements EntityChangeListener { } @Override - public void onTraitAdded(ITypedReferenceableInstance entity, IStruct trait) throws AtlasException { + public void onTraitsAdded(ITypedReferenceableInstance entity, Collection<? extends IStruct> traits) throws AtlasException { notifyOfEntityEvent(Collections.singleton(entity), EntityNotification.OperationType.TRAIT_ADD); } @Override - public void onTraitDeleted(ITypedReferenceableInstance entity, String traitName) throws AtlasException { + public void onTraitsDeleted(ITypedReferenceableInstance entity, Collection<String> traitNames) throws AtlasException { notifyOfEntityEvent(Collections.singleton(entity), EntityNotification.OperationType.TRAIT_DELETE); }
