Repository: atlas
Updated Branches:
  refs/heads/branch-0.8 1584139c2 -> 900f99bb4


ATLAS-3002: added instrumentation to collect time taken for sub-tasks during 
entity create/update


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/900f99bb
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/900f99bb
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/900f99bb

Branch: refs/heads/branch-0.8
Commit: 900f99bb47278fbc1ca6d7ed4f55fd715aac6ad2
Parents: 1584139
Author: Madhan Neethiraj <mad...@apache.org>
Authored: Tue Dec 18 13:01:26 2018 -0800
Committer: Madhan Neethiraj <mad...@apache.org>
Committed: Tue Dec 18 14:58:29 2018 -0800

----------------------------------------------------------------------
 .../hive/hook/events/CreateHiveProcess.java     |   2 +-
 distro/src/conf/atlas-log4j.xml                 |  19 +++-
 .../apache/atlas/kafka/KafkaNotification.java   |   3 +
 .../atlas/GraphTransactionInterceptor.java      |   8 ++
 .../repository/audit/EntityAuditListener.java   |  25 +++++
 .../converters/AtlasInstanceConverter.java      |  16 +--
 .../repository/graph/FullTextMapperV2.java      |  91 +++++++++++-----
 .../graph/v1/AtlasEntityChangeNotifier.java     |  10 ++
 .../graph/v1/AtlasEntityGraphDiscoveryV1.java   |  10 ++
 .../store/graph/v1/AtlasEntityStoreV1.java      |   9 ++
 .../store/graph/v1/AtlasGraphUtilsV1.java       |  22 +++-
 .../store/graph/v1/EntityGraphMapper.java       |   5 +
 .../java/org/apache/atlas/RequestContextV1.java |  14 ++-
 .../java/org/apache/atlas/metrics/Metrics.java  | 109 ++++++++++++++-----
 .../NotificationEntityChangeListener.java       |   7 +-
 .../notification/NotificationHookConsumer.java  |  12 ++
 16 files changed, 294 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/900f99bb/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
----------------------------------------------------------------------
diff --git 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
index 84f9c97..aeb980a 100644
--- 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
+++ 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
@@ -181,7 +181,7 @@ public class CreateHiveProcess extends BaseHiveEvent {
 
             AtlasEntity columnLineageProcess = new 
AtlasEntity(HIVE_TYPE_COLUMN_LINEAGE);
 
-            columnLineageProcess.setAttribute(ATTRIBUTE_NAME, 
hiveProcess.getAttribute(ATTRIBUTE_NAME) + ":" + 
outputColumn.getAttribute(ATTRIBUTE_NAME));
+            columnLineageProcess.setAttribute(ATTRIBUTE_NAME, 
hiveProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + ":" + 
outputColumn.getAttribute(ATTRIBUTE_NAME));
             columnLineageProcess.setAttribute(ATTRIBUTE_QUALIFIED_NAME, 
hiveProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + ":" + 
outputColumn.getAttribute(ATTRIBUTE_NAME));
             columnLineageProcess.setAttribute(ATTRIBUTE_INPUTS, 
getObjectIds(inputColumns));
             columnLineageProcess.setAttribute(ATTRIBUTE_OUTPUTS, 
Collections.singletonList(getObjectId(outputColumn)));

http://git-wip-us.apache.org/repos/asf/atlas/blob/900f99bb/distro/src/conf/atlas-log4j.xml
----------------------------------------------------------------------
diff --git a/distro/src/conf/atlas-log4j.xml b/distro/src/conf/atlas-log4j.xml
index a0f9629..fc9629b 100755
--- a/distro/src/conf/atlas-log4j.xml
+++ b/distro/src/conf/atlas-log4j.xml
@@ -37,6 +37,16 @@
         </layout>
     </appender>
 
+    <appender name="LARGE_MESSAGES" 
class="org.apache.log4j.RollingFileAppender">
+        <param name="File" value="{{log_dir}}/large_messages.log"/>
+        <param name="Append" value="true"/>
+        <param name="MaxFileSize" value="100MB" />
+        <param name="MaxBackupIndex" value="20" />
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%d %m%n"/>
+        </layout>
+    </appender>
+
     <appender name="AUDIT" class="org.apache.log4j.RollingFileAppender">
         <param name="File" value="${atlas.log.dir}/audit.log"/>
         <param name="Append" value="true"/>
@@ -60,7 +70,7 @@
         <param name="File" value="${atlas.log.dir}/failed.log"/>
         <param name="Append" value="true"/>
         <layout class="org.apache.log4j.PatternLayout">
-            <param name="ConversionPattern" value="%d %m"/>
+            <param name="ConversionPattern" value="%d %m%n"/>
             <param name="maxFileSize" value="100MB" />
             <param name="maxBackupIndex" value="20" />
         </layout>
@@ -119,6 +129,11 @@
         <appender-ref ref="AUDIT"/>
     </logger>
 
+    <logger name="LARGE_MESSAGES" additivity="false">
+        <level value="warn"/>
+        <appender-ref ref="LARGE_MESSAGES"/>
+    </logger>
+
     <logger name="METRICS" additivity="false">
         <level value="debug"/>
         <appender-ref ref="METRICS"/>
@@ -126,7 +141,7 @@
 
     <logger name="FAILED" additivity="false">
         <level value="info"/>
-        <appender-ref ref="AUDIT"/>
+        <appender-ref ref="FAILED"/>
     </logger>
 
     <root>

http://git-wip-us.apache.org/repos/asf/atlas/blob/900f99bb/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
----------------------------------------------------------------------
diff --git 
a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java 
b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 73bc315..21dae95 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -106,6 +106,9 @@ public class KafkaNotification extends AbstractNotification 
implements Service {
         properties.put("enable.auto.commit", 
kafkaConf.getBoolean("enable.auto.commit", oldApiCommitEnableFlag));
         properties.put("session.timeout.ms", 
kafkaConf.getString("session.timeout.ms", "30000"));
 
+        // if no value is specified for max.poll.records, set to 1
+        properties.put("max.poll.records", 
kafkaConf.getInt("max.poll.records", 1));
+
         LOG.info("<== KafkaNotification()");
     }
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/900f99bb/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java 
b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
index c6a4bbe..58dd36f 100644
--- a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
+++ b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
 import org.aopalliance.intercept.MethodInterceptor;
 import org.aopalliance.intercept.MethodInvocation;
 import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.metrics.Metrics.MetricRecorder;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.typesystem.exception.NotFoundException;
 import org.slf4j.Logger;
@@ -54,11 +55,16 @@ public class GraphTransactionInterceptor implements 
MethodInterceptor {
     @Override
     public Object invoke(MethodInvocation invocation) throws Throwable {
         boolean isSuccess = false;
+        MetricRecorder metric = null;
 
         try {
             try {
                 Object response = invocation.proceed();
+
+                metric = 
RequestContextV1.get().startMetricRecord("graphCommit");
+
                 graph.commit();
+
                 isSuccess = true;
 
                 if (LOG.isDebugEnabled()) {
@@ -76,6 +82,8 @@ public class GraphTransactionInterceptor implements 
MethodInterceptor {
                 throw t;
             }
         } finally {
+            RequestContextV1.get().endMetricRecord(metric);
+
             List<PostTransactionHook> trxHooks = postTransactionHooks.get();
 
             if (trxHooks != null) {

http://git-wip-us.apache.org/repos/asf/atlas/blob/900f99bb/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java
 
b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java
index 2a1881b..64b7f55 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java
@@ -23,6 +23,7 @@ import org.apache.atlas.EntityAuditEvent;
 import org.apache.atlas.EntityAuditEvent.EntityAuditAction;
 import org.apache.atlas.RequestContextV1;
 import org.apache.atlas.listener.EntityChangeListener;
+import org.apache.atlas.metrics.Metrics.MetricRecorder;
 import org.apache.atlas.typesystem.IReferenceableInstance;
 import org.apache.atlas.typesystem.IStruct;
 import org.apache.atlas.typesystem.ITypedReferenceableInstance;
@@ -58,6 +59,8 @@ public class EntityAuditListener implements 
EntityChangeListener {
 
     @Override
     public void onEntitiesAdded(Collection<ITypedReferenceableInstance> 
entities, boolean isImport) throws AtlasException {
+        MetricRecorder metric = 
RequestContextV1.get().startMetricRecord("entityAudit");
+
         List<EntityAuditEvent> events = new ArrayList<>();
         for (ITypedReferenceableInstance entity : entities) {
             EntityAuditEvent event = createEvent(entity, isImport ? 
EntityAuditAction.ENTITY_IMPORT_CREATE : EntityAuditAction.ENTITY_CREATE);
@@ -65,10 +68,14 @@ public class EntityAuditListener implements 
EntityChangeListener {
         }
 
         auditRepository.putEvents(events);
+
+        RequestContextV1.get().endMetricRecord(metric);
     }
 
     @Override
     public void onEntitiesUpdated(Collection<ITypedReferenceableInstance> 
entities, boolean isImport) throws AtlasException {
+        MetricRecorder metric = 
RequestContextV1.get().startMetricRecord("entityAudit");
+
         List<EntityAuditEvent> events = new ArrayList<>();
         for (ITypedReferenceableInstance entity : entities) {
             EntityAuditEvent event = createEvent(entity, isImport ? 
EntityAuditAction.ENTITY_IMPORT_UPDATE : EntityAuditAction.ENTITY_UPDATE);
@@ -76,10 +83,14 @@ public class EntityAuditListener implements 
EntityChangeListener {
         }
 
         auditRepository.putEvents(events);
+
+        RequestContextV1.get().endMetricRecord(metric);
     }
 
     @Override
     public void onTraitsAdded(ITypedReferenceableInstance entity, Collection<? 
extends IStruct> traits) throws AtlasException {
+        MetricRecorder metric = 
RequestContextV1.get().startMetricRecord("entityAudit");
+
         if (traits != null) {
             for (IStruct trait : traits) {
                 EntityAuditEvent event = createEvent(entity, 
EntityAuditAction.TAG_ADD,
@@ -88,10 +99,14 @@ public class EntityAuditListener implements 
EntityChangeListener {
                 auditRepository.putEvents(event);
             }
         }
+
+        RequestContextV1.get().endMetricRecord(metric);
     }
 
     @Override
     public void onTraitsDeleted(ITypedReferenceableInstance entity, 
Collection<String> traitNames) throws AtlasException {
+        MetricRecorder metric = 
RequestContextV1.get().startMetricRecord("entityAudit");
+
         if (traitNames != null) {
             for (String traitName : traitNames) {
                 EntityAuditEvent event = createEvent(entity, 
EntityAuditAction.TAG_DELETE, "Deleted trait: " + traitName);
@@ -99,10 +114,14 @@ public class EntityAuditListener implements 
EntityChangeListener {
                 auditRepository.putEvents(event);
             }
         }
+
+        RequestContextV1.get().endMetricRecord(metric);
     }
 
     @Override
     public void onTraitsUpdated(ITypedReferenceableInstance entity, 
Collection<? extends IStruct> traits) throws AtlasException {
+        MetricRecorder metric = 
RequestContextV1.get().startMetricRecord("entityAudit");
+
         if (traits != null) {
             for (IStruct trait : traits) {
                 EntityAuditEvent event = createEvent(entity, 
EntityAuditAction.TAG_UPDATE,
@@ -111,10 +130,14 @@ public class EntityAuditListener implements 
EntityChangeListener {
                 auditRepository.putEvents(event);
             }
         }
+
+        RequestContextV1.get().endMetricRecord(metric);
     }
 
     @Override
     public void onEntitiesDeleted(Collection<ITypedReferenceableInstance> 
entities, boolean isImport) throws AtlasException {
+        MetricRecorder metric = 
RequestContextV1.get().startMetricRecord("entityAudit");
+
         List<EntityAuditEvent> events = new ArrayList<>();
         for (ITypedReferenceableInstance entity : entities) {
             EntityAuditEvent event = createEvent(entity, isImport ? 
EntityAuditAction.ENTITY_IMPORT_DELETE : EntityAuditAction.ENTITY_DELETE, 
"Deleted entity");
@@ -122,6 +145,8 @@ public class EntityAuditListener implements 
EntityChangeListener {
         }
 
         auditRepository.putEvents(events);
+
+        RequestContextV1.get().endMetricRecord(metric);
     }
 
     public List<EntityAuditEvent> getAuditEvents(String guid) throws 
AtlasException{

http://git-wip-us.apache.org/repos/asf/atlas/blob/900f99bb/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java
 
b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java
index 9bde5db..b4d399a 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java
@@ -20,6 +20,7 @@ package org.apache.atlas.repository.converters;
 import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.CreateUpdateEntitiesResult;
+import org.apache.atlas.RequestContext;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.TypeCategory;
 import org.apache.atlas.model.instance.AtlasClassification;
@@ -99,17 +100,18 @@ public class AtlasInstanceConverter {
     }
 
     public ITypedReferenceableInstance getITypedReferenceable(AtlasEntity 
entity) throws AtlasBaseException {
-        try {
-            return metadataService.getEntityDefinition(entity.getGuid());
-        } catch (AtlasException e) {
-            LOG.error("Exception while getting a typed reference for the 
entity ", e);
-            throw toAtlasBaseException(e);
-        }
+        return getITypedReferenceable(entity.getGuid());
     }
 
     public ITypedReferenceableInstance getITypedReferenceable(String guid) 
throws AtlasBaseException {
         try {
-            return metadataService.getEntityDefinition(guid);
+            ITypedReferenceableInstance ret = 
RequestContext.get().getInstanceV1(guid);
+
+            if (ret == null) {
+                ret = metadataService.getEntityDefinition(guid);
+            }
+
+            return ret;
         } catch (AtlasException e) {
             LOG.error("Exception while getting a typed reference for the 
entity ", e);
             throw toAtlasBaseException(e);

http://git-wip-us.apache.org/repos/asf/atlas/blob/900f99bb/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapperV2.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapperV2.java
 
b/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapperV2.java
index 5b5158c..84cd999 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapperV2.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapperV2.java
@@ -26,6 +26,13 @@ import 
org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
 import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.instance.AtlasStruct;
 import org.apache.atlas.repository.store.graph.v1.EntityGraphRetriever;
+import org.apache.atlas.type.AtlasArrayType;
+import org.apache.atlas.type.AtlasBuiltInTypes;
+import org.apache.atlas.type.AtlasClassificationType;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasStructType;
+import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
+import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
@@ -37,6 +44,7 @@ import org.springframework.stereotype.Component;
 
 import javax.inject.Inject;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -52,17 +60,19 @@ public class FullTextMapperV2 {
     private static final String FULL_TEXT_FOLLOW_REFERENCES          = 
"atlas.search.fulltext.followReferences";
     private static final String FULL_TEXT_EXCLUDE_ATTRIBUTE_PROPERTY = 
"atlas.search.fulltext.type";
 
+    private final AtlasTypeRegistry        typeRegistry;
+    private final Configuration            configuration;
     private final EntityGraphRetriever     entityGraphRetriever;
     private final boolean                  followReferences;
     private final Map<String, Set<String>> excludeAttributesCache = new 
HashMap<>();
 
-    private Configuration APPLICATION_PROPERTIES = null;
 
     @Inject
     public FullTextMapperV2(AtlasTypeRegistry typeRegistry, Configuration 
configuration) {
-        entityGraphRetriever   = new EntityGraphRetriever(typeRegistry);
-        APPLICATION_PROPERTIES = configuration;
-        followReferences       = APPLICATION_PROPERTIES != null && 
APPLICATION_PROPERTIES.getBoolean(FULL_TEXT_FOLLOW_REFERENCES, false);
+        this.typeRegistry    = typeRegistry;
+        this.configuration   = configuration;
+        entityGraphRetriever = new EntityGraphRetriever(typeRegistry);
+        followReferences     = this.configuration != null && 
this.configuration.getBoolean(FULL_TEXT_FOLLOW_REFERENCES, false);
     }
 
     /**
@@ -89,11 +99,12 @@ public class FullTextMapperV2 {
 
             if (CollectionUtils.isNotEmpty(classifications)) {
                 for (AtlasClassification classification : classifications) {
-                    
sb.append(classification.getTypeName()).append(FULL_TEXT_DELIMITER);
+                    final AtlasClassificationType classificationType = 
typeRegistry.getClassificationTypeByName(classification.getTypeName());
+                    final Set<String>             excludeAttributes  = 
getExcludeAttributesForIndexText(classification.getTypeName());
 
-                    Set<String> excludeAttributes = 
getExcludeAttributesForIndexText(classification.getTypeName());
+                    
sb.append(classification.getTypeName()).append(FULL_TEXT_DELIMITER);
 
-                    mapAttributes(classification.getAttributes(), 
entityWithExtInfo, sb, new HashSet<String>(), excludeAttributes);
+                    mapAttributes(classificationType, 
classification.getAttributes(), entityWithExtInfo, sb, new HashSet<String>(), 
excludeAttributes);
                 }
             }
 
@@ -109,12 +120,23 @@ public class FullTextMapperV2 {
 
     public String getIndexTextForEntity(String guid) throws AtlasBaseException 
{
         String      ret    = null;
-        AtlasEntity entity = getAndCacheEntity(guid);
+        final AtlasEntity        entity;
+        final AtlasEntityExtInfo entityExtInfo;
+
+        if (followReferences) {
+            AtlasEntityWithExtInfo entityWithExtInfo = 
getAndCacheEntityWithExtInfo(guid);
+
+            entity        = entityWithExtInfo != null ? 
entityWithExtInfo.getEntity() : null;
+            entityExtInfo = entityWithExtInfo;
+        } else {
+            entity        = getAndCacheEntity(guid);
+            entityExtInfo = null;
+        }
 
         if (entity != null) {
             StringBuilder sb = new StringBuilder();
 
-            map(entity, null, sb, new HashSet<String>());
+            map(entity, entityExtInfo, sb, new HashSet<String>());
 
             ret = sb.toString();
         }
@@ -131,27 +153,29 @@ public class FullTextMapperV2 {
             return;
         }
 
+        final AtlasEntityType entityType        = 
typeRegistry.getEntityTypeByName(entity.getTypeName());
+        final Set<String>     excludeAttributes = 
getExcludeAttributesForIndexText(entity.getTypeName());
+
         processedGuids.add(entity.getGuid());
 
         sb.append(entity.getTypeName()).append(FULL_TEXT_DELIMITER);
 
-        Set<String> excludeAttributes = 
getExcludeAttributesForIndexText(entity.getTypeName());
-
-        mapAttributes(entity.getAttributes(), entityExtInfo, sb, 
processedGuids, excludeAttributes);
+        mapAttributes(entityType, entity.getAttributes(), entityExtInfo, sb, 
processedGuids, excludeAttributes);
 
-        List<AtlasClassification> classifications = 
entity.getClassifications();
+        final List<AtlasClassification> classifications = 
entity.getClassifications();
         if (CollectionUtils.isNotEmpty(classifications)) {
             for (AtlasClassification classification : classifications) {
-                
sb.append(classification.getTypeName()).append(FULL_TEXT_DELIMITER);
+                final AtlasClassificationType classificationType              
= typeRegistry.getClassificationTypeByName(classification.getTypeName());
+                final Set<String>             excludeClassificationAttributes 
= getExcludeAttributesForIndexText(classification.getTypeName());
 
-                Set<String> excludeClassificationAttributes = 
getExcludeAttributesForIndexText(classification.getTypeName());
+                
sb.append(classification.getTypeName()).append(FULL_TEXT_DELIMITER);
 
-                mapAttributes(classification.getAttributes(), entityExtInfo, 
sb, processedGuids, excludeClassificationAttributes);
+                mapAttributes(classificationType, 
classification.getAttributes(), entityExtInfo, sb, processedGuids, 
excludeClassificationAttributes);
             }
         }
     }
 
-    private void mapAttributes(Map<String, Object> attributes, 
AtlasEntityExtInfo entityExtInfo, StringBuilder sb,
+    private void mapAttributes(AtlasStructType structType, Map<String, Object> 
attributes, AtlasEntityExtInfo entityExtInfo, StringBuilder sb,
                                Set<String> processedGuids, Set<String> 
excludeAttributes) throws AtlasBaseException {
         if (MapUtils.isEmpty(attributes)) {
             return;
@@ -161,10 +185,27 @@ public class FullTextMapperV2 {
             String attribKey = attributeEntry.getKey();
             Object attrValue = attributeEntry.getValue();
 
-            if (attrValue == null || isExcludedAttribute(excludeAttributes, 
attribKey)) {
+            if (attrValue == null || excludeAttributes.contains(attribKey)) {
                 continue;
             }
 
+            if (!followReferences) {
+                AtlasAttribute attribute     = structType != null ? 
structType.getAttribute(attribKey) : null;
+                AtlasType      attributeType = attribute != null ? 
attribute.getAttributeType() : null;
+
+                if (attributeType == null) {
+                    continue;
+                }
+
+                if (attributeType instanceof AtlasArrayType) {
+                    attributeType = ((AtlasArrayType) 
attributeType).getElementType();
+                }
+
+                if (attributeType instanceof AtlasEntityType || attributeType 
instanceof AtlasBuiltInTypes.AtlasObjectIdType) {
+                    continue;
+                }
+            }
+
             sb.append(attribKey).append(FULL_TEXT_DELIMITER);
 
             mapAttribute(attrValue, entityExtInfo, sb, processedGuids);
@@ -247,25 +288,25 @@ public class FullTextMapperV2 {
         return entityWithExtInfo;
     }
 
-    private boolean isExcludedAttribute(Set<String> excludeAttributes, String 
attributeName) {
-        return CollectionUtils.isNotEmpty(excludeAttributes) && 
excludeAttributes.contains(attributeName);
-    }
-
     private Set<String> getExcludeAttributesForIndexText(String typeName) {
-        Set<String> ret = null;
+        final Set<String> ret;
 
         if (excludeAttributesCache.containsKey(typeName)) {
             ret = excludeAttributesCache.get(typeName);
 
-        } else if (APPLICATION_PROPERTIES != null) {
-            String[] excludeAttributes = 
APPLICATION_PROPERTIES.getStringArray(FULL_TEXT_EXCLUDE_ATTRIBUTE_PROPERTY + 
"." +
+        } else if (configuration != null) {
+            String[] excludeAttributes = 
configuration.getStringArray(FULL_TEXT_EXCLUDE_ATTRIBUTE_PROPERTY + "." +
                                                                                
typeName + "." + "attributes.exclude");
 
             if (ArrayUtils.isNotEmpty(excludeAttributes)) {
                 ret = new HashSet<>(Arrays.asList(excludeAttributes));
+            } else {
+                ret = Collections.emptySet();
             }
 
             excludeAttributesCache.put(typeName, ret);
+        } else {
+            ret = Collections.emptySet();
         }
 
         return ret;

http://git-wip-us.apache.org/repos/asf/atlas/blob/900f99bb/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
index 92d95f9..054ebd1 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
@@ -20,8 +20,10 @@ package org.apache.atlas.repository.store.graph.v1;
 
 import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.AtlasException;
+import org.apache.atlas.RequestContextV1;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.listener.EntityChangeListener;
+import org.apache.atlas.metrics.Metrics.MetricRecorder;
 import org.apache.atlas.model.instance.AtlasClassification;
 import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.model.instance.EntityMutationResponse;
@@ -186,6 +188,8 @@ public class AtlasEntityChangeNotifier {
     }
 
     private List<ITypedReferenceableInstance> 
toITypedReferenceable(List<AtlasEntityHeader> entityHeaders, EntityOperation 
operation) throws AtlasBaseException {
+        MetricRecorder metric = 
RequestContextV1.get().startMetricRecord("notification-getReferenceable");
+
         List<ITypedReferenceableInstance> ret = new 
ArrayList<>(entityHeaders.size());
 
         // delete notifications need only entity-guid. In case of hard-delete, 
getITypedReferenceable() call below will
@@ -200,6 +204,8 @@ public class AtlasEntityChangeNotifier {
             }
         }
 
+        RequestContextV1.get().endMetricRecord(metric);
+
         return ret;
     }
 
@@ -242,6 +248,8 @@ public class AtlasEntityChangeNotifier {
             LOG.warn("Unable to determine if FullText is disabled. Proceeding 
with FullText mapping");
         }
 
+        MetricRecorder metric = 
RequestContextV1.get().startMetricRecord("fullTextMapping");
+
         for (AtlasEntityHeader atlasEntityHeader : atlasEntityHeaders) {
             String      guid        = atlasEntityHeader.getGuid();
             AtlasVertex atlasVertex = AtlasGraphUtilsV1.findByGuid(guid);
@@ -258,6 +266,8 @@ public class AtlasEntityChangeNotifier {
                 LOG.error("FullText mapping failed for Vertex[ guid = {} ]", 
guid, e);
             }
         }
+
+        RequestContextV1.get().endMetricRecord(metric);
     }
 
     private void updateFullTextMapping(String entityId, 
List<AtlasClassification> classifications) {

http://git-wip-us.apache.org/repos/asf/atlas/blob/900f99bb/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java
index 12e8bb1..66a92fa 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java
@@ -18,7 +18,9 @@
 package org.apache.atlas.repository.store.graph.v1;
 
 import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.RequestContextV1;
 import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.metrics.Metrics.MetricRecorder;
 import org.apache.atlas.model.TypeCategory;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasObjectId;
@@ -127,6 +129,8 @@ public class AtlasEntityGraphDiscoveryV1 implements 
EntityGraphDiscovery {
 
 
     protected void discover() throws AtlasBaseException {
+        MetricRecorder metric = 
RequestContextV1.get().startMetricRecord("walkEntityGraph");
+
         EntityStream entityStream = discoveryContext.getEntityStream();
 
         Set<String> walkedEntities = new HashSet<>();
@@ -162,9 +166,13 @@ public class AtlasEntityGraphDiscoveryV1 implements 
EntityGraphDiscovery {
                 walkedEntities.add(entity.getGuid());
             }
         }
+
+        RequestContextV1.get().endMetricRecord(metric);
     }
 
     protected void resolveReferences() throws AtlasBaseException {
+        MetricRecorder metric = 
RequestContextV1.get().startMetricRecord("resolveReferences");
+
         EntityResolver[] entityResolvers = new EntityResolver[] { new 
IDBasedEntityResolver(typeRegistry),
                                                                   new 
UniqAttrBasedEntityResolver(typeRegistry)
                                                                 };
@@ -172,6 +180,8 @@ public class AtlasEntityGraphDiscoveryV1 implements 
EntityGraphDiscovery {
         for (EntityResolver resolver : entityResolvers) {
             resolver.resolveEntityReferences(discoveryContext);
         }
+
+        RequestContextV1.get().endMetricRecord(metric);
     }
 
     private void visitReference(AtlasObjectIdType type, Object val) throws 
AtlasBaseException {

http://git-wip-us.apache.org/repos/asf/atlas/blob/900f99bb/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
index f578ded..1d89c83 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
@@ -23,6 +23,7 @@ import org.apache.atlas.GraphTransactionInterceptor;
 import org.apache.atlas.RequestContextV1;
 import org.apache.atlas.annotation.GraphTransaction;
 import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.metrics.Metrics.MetricRecorder;
 import org.apache.atlas.model.instance.AtlasCheckStateRequest;
 import org.apache.atlas.model.instance.AtlasCheckStateResult;
 import org.apache.atlas.model.instance.AtlasClassification;
@@ -218,6 +219,8 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore 
{
             throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, 
"no entities to create/update.");
         }
 
+        MetricRecorder metric = 
RequestContextV1.get().startMetricRecord("createOrUpdate");
+
         // Create/Update entities
         EntityMutationContext context = preCreateOrUpdate(entityStream, 
entityGraphMapper, isPartialUpdate);
 
@@ -232,6 +235,8 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore 
{
         // Notify the change listeners
         entityChangeNotifier.onEntitiesMutated(ret, entityStream instanceof 
EntityImportStream);
 
+        RequestContextV1.get().endMetricRecord(metric);
+
         return ret;
     }
 
@@ -606,6 +611,8 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore 
{
     }
 
     private EntityMutationContext preCreateOrUpdate(EntityStream entityStream, 
EntityGraphMapper entityGraphMapper, boolean isPartialUpdate) throws 
AtlasBaseException {
+        MetricRecorder metric = 
RequestContextV1.get().startMetricRecord("preCreateOrUpdate");
+
         EntityGraphDiscovery        graphDiscoverer  = new 
AtlasEntityGraphDiscoveryV1(typeRegistry, entityStream);
         EntityGraphDiscoveryContext discoveryContext = 
graphDiscoverer.discoverEntities();
         EntityMutationContext       context          = new 
EntityMutationContext(discoveryContext);
@@ -668,6 +675,8 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore 
{
             }
         }
 
+        RequestContextV1.get().endMetricRecord(metric);
+
         return context;
     }
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/900f99bb/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
index 65c5d41..03d94ce 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
@@ -20,8 +20,10 @@ package org.apache.atlas.repository.store.graph.v1;
 
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.RequestContextV1;
 import org.apache.atlas.discovery.SearchProcessor;
 import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.metrics.Metrics.MetricRecorder;
 import org.apache.atlas.model.TypeCategory;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
@@ -245,6 +247,8 @@ public class AtlasGraphUtilsV1 {
     }
 
     public static AtlasVertex findByUniqueAttributes(AtlasEntityType 
entityType, Map<String, Object> attrValues) {
+        MetricRecorder metric = 
RequestContextV1.get().startMetricRecord("findByUniqueAttributes");
+
         AtlasVertex vertex = null;
 
         final Map<String, AtlasAttribute> uniqueAttributes = 
entityType.getUniqAttributes();
@@ -278,6 +282,8 @@ public class AtlasGraphUtilsV1 {
             }
         }
 
+        RequestContextV1.get().endMetricRecord(metric);
+
         return vertex;
     }
 
@@ -309,28 +315,36 @@ public class AtlasGraphUtilsV1 {
     }
 
     public static AtlasVertex findByTypeAndPropertyName(String typeName, 
String propertyName, Object attrVal) {
+        MetricRecorder metric = 
RequestContextV1.get().startMetricRecord("findByTypeAndPropertyName");
+
         AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query()
                                                     
.has(Constants.ENTITY_TYPE_PROPERTY_KEY, typeName)
-                                                    
.has(Constants.STATE_PROPERTY_KEY, AtlasEntity.Status.ACTIVE.name())
-                                                    .has(propertyName, 
attrVal);
+                                                    .has(propertyName, attrVal)
+                                                    
.has(Constants.STATE_PROPERTY_KEY, AtlasEntity.Status.ACTIVE.name());
 
         Iterator<AtlasVertex> results = query.vertices().iterator();
 
         AtlasVertex vertex = results.hasNext() ? results.next() : null;
 
+        RequestContextV1.get().endMetricRecord(metric);
+
         return vertex;
     }
 
     public static AtlasVertex findBySuperTypeAndPropertyName(String typeName, 
String propertyName, Object attrVal) {
+        MetricRecorder metric = 
RequestContextV1.get().startMetricRecord("findBySuperTypeAndPropertyName");
+
         AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query()
                                                     
.has(Constants.SUPER_TYPES_PROPERTY_KEY, typeName)
-                                                    
.has(Constants.STATE_PROPERTY_KEY, AtlasEntity.Status.ACTIVE.name())
-                                                    .has(propertyName, 
attrVal);
+                                                    .has(propertyName, attrVal)
+                                                    
.has(Constants.STATE_PROPERTY_KEY, AtlasEntity.Status.ACTIVE.name());
 
         Iterator<AtlasVertex> results = query.vertices().iterator();
 
         AtlasVertex vertex = results.hasNext() ? results.next() : null;
 
+        RequestContextV1.get().endMetricRecord(metric);
+
         return vertex;
     }
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/900f99bb/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
index e818ed5..a9bf55a 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
@@ -24,6 +24,7 @@ import org.apache.atlas.AtlasException;
 import org.apache.atlas.RequestContext;
 import org.apache.atlas.RequestContextV1;
 import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.metrics.Metrics.MetricRecorder;
 import org.apache.atlas.model.TypeCategory;
 import org.apache.atlas.model.instance.AtlasClassification;
 import org.apache.atlas.model.instance.AtlasEntity;
@@ -135,6 +136,8 @@ public class EntityGraphMapper {
     }
 
     public EntityMutationResponse 
mapAttributesAndClassifications(EntityMutationContext context, final boolean 
isPartialUpdate, final boolean replaceClassifications) throws 
AtlasBaseException {
+        MetricRecorder metric = 
RequestContextV1.get().startMetricRecord("mapAttributesAndClassifications");
+
         EntityMutationResponse resp = new EntityMutationResponse();
 
         Collection<AtlasEntity> createdEntities = context.getCreatedEntities();
@@ -189,6 +192,8 @@ public class EntityGraphMapper {
             }
         }
 
+        RequestContextV1.get().endMetricRecord(metric);
+
         return resp;
     }
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/900f99bb/server-api/src/main/java/org/apache/atlas/RequestContextV1.java
----------------------------------------------------------------------
diff --git a/server-api/src/main/java/org/apache/atlas/RequestContextV1.java 
b/server-api/src/main/java/org/apache/atlas/RequestContextV1.java
index b85769c..5ae8055 100644
--- a/server-api/src/main/java/org/apache/atlas/RequestContextV1.java
+++ b/server-api/src/main/java/org/apache/atlas/RequestContextV1.java
@@ -19,6 +19,7 @@
 package org.apache.atlas;
 
 import org.apache.atlas.metrics.Metrics;
+import org.apache.atlas.metrics.Metrics.MetricRecorder;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.store.DeleteType;
@@ -34,7 +35,7 @@ import java.util.Map;
 import java.util.Set;
 
 public class RequestContextV1 {
-    private static final Logger LOG = 
LoggerFactory.getLogger(RequestContextV1.class);
+    private static final Logger METRICS = LoggerFactory.getLogger("METRICS");
 
     private static final ThreadLocal<RequestContextV1> CURRENT_CONTEXT = new 
ThreadLocal<>();
     private static final Set<RequestContextV1>         ACTIVE_REQUESTS = new 
HashSet<>();
@@ -91,6 +92,12 @@ public class RequestContextV1 {
         if (this.entityGuidInRequest != null) {
             this.entityGuidInRequest.clear();
         }
+ 
+        if (METRICS.isDebugEnabled() && !metrics.isEmpty()) {
+            METRICS.debug(metrics.toString());
+        }
+
+        metrics.clear();
     }
 
     public static int getActiveRequestsCount() {
@@ -181,11 +188,14 @@ public class RequestContextV1 {
         return deletedEntities.containsKey(guid);
     }
 
+    public MetricRecorder startMetricRecord(String name) { return 
metrics.getMetricRecorder(name); }
+
+    public void endMetricRecord(MetricRecorder recorder) { 
metrics.recordMetric(recorder); }
+
     public static Metrics getMetrics() {
         return get().metrics;
     }
 
-
     public void recordEntityGuidUpdate(AtlasEntity entity, String 
guidInRequest) {
         if (entityGuidInRequest == null) {
             entityGuidInRequest = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/atlas/blob/900f99bb/server-api/src/main/java/org/apache/atlas/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/server-api/src/main/java/org/apache/atlas/metrics/Metrics.java 
b/server-api/src/main/java/org/apache/atlas/metrics/Metrics.java
index e0f4e49..6bb8c2a 100644
--- a/server-api/src/main/java/org/apache/atlas/metrics/Metrics.java
+++ b/server-api/src/main/java/org/apache/atlas/metrics/Metrics.java
@@ -18,51 +18,108 @@
 
 package org.apache.atlas.metrics;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Set;
 
 public class Metrics {
-    public static class Counters {
-        private short invocations = 0;
-        private long totalTimeMSecs = 0;
+    private static final Logger METRICS = LoggerFactory.getLogger("METRICS");
 
-        @Override
-        public String toString() {
-            return "[count=" + invocations + ", totalTimeMSec=" + 
totalTimeMSecs + "]";
-        }
+    private final Map<String, Metric> metrics = new LinkedHashMap<>();
 
-        public short getInvocations() {
-            return invocations;
-        }
 
-        public long getTotalTimeMSecs() {
-            return totalTimeMSecs;
-        }
+    public MetricRecorder getMetricRecorder(String name) {
+        return METRICS.isDebugEnabled() ? new MetricRecorder(name) : null;
     }
 
-    Map<String, Counters> countersMap = new LinkedHashMap<>();
+    public void recordMetric(MetricRecorder recorder) {
+        if (recorder != null) {
+            final String name      = recorder.name;
+            final long   timeTaken = recorder.getElapsedTime();
+
+            Metric metric = metrics.get(name);
+
+            if (metric == null) {
+                metric = new Metric(name);
+
+                metrics.put(name, metric);
+            }
 
-    public void record(String name, long timeMsecs) {
-        Counters counter = countersMap.get(name);
-        if (counter == null) {
-            counter = new Counters();
-            countersMap.put(name, counter);
+            metric.invocations++;
+            metric.totalTimeMSecs += timeTaken;
         }
+    }
+
+    public void clear() {
+        metrics.clear();
+    }
+
+    public boolean isEmpty() {
+        return metrics.isEmpty();
+    }
+
+    public Set<String> getMetricsNames() {
+        return metrics.keySet();
+    }
 
-        counter.invocations++;
-        counter.totalTimeMSecs += timeMsecs;
+    public Metric getMetric(String name) {
+        return metrics.get(name);
     }
 
     @Override
     public String toString() {
-        return countersMap.toString();
+        StringBuilder sb = new StringBuilder();
+
+        sb.append("{");
+
+        if (!metrics.isEmpty()) {
+            for (Metric metric : metrics.values()) {
+                
sb.append("\"").append(metric.getName()).append("\":{\"count\":").append(metric.getInvocations()).append(",\"timeTaken\":").append(metric.getTotalTimeMSecs()).append("},");
+            }
+
+            sb.setLength(sb.length() - 1); // remove last ","
+        }
+
+        sb.append("}");
+
+        return sb.toString();
     }
 
-    public boolean isEmpty() {
-        return countersMap.isEmpty();
+    public class MetricRecorder {
+        private final String name;
+        private final long   startTimeMs = System.currentTimeMillis();
+
+        MetricRecorder(String name) {
+            this.name = name;
+        }
+
+        long getElapsedTime() {
+            return System.currentTimeMillis() - startTimeMs;
+        }
     }
 
-    public Counters getCounters(String name) {
-        return countersMap.get(name);
+    public static class Metric {
+        private final String name;
+        private       short  invocations    = 0;
+        private       long   totalTimeMSecs = 0;
+
+        public Metric(String name) {
+            this.name = name;
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public short getInvocations() {
+            return invocations;
+        }
+
+        public long getTotalTimeMSecs() {
+            return totalTimeMSecs;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/900f99bb/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java
 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java
index 926acfb..02bfccc 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java
@@ -20,7 +20,9 @@ package org.apache.atlas.notification;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasException;
+import org.apache.atlas.RequestContextV1;
 import org.apache.atlas.listener.EntityChangeListener;
+import org.apache.atlas.metrics.Metrics.MetricRecorder;
 import org.apache.atlas.notification.entity.EntityNotification;
 import org.apache.atlas.notification.entity.EntityNotificationImpl;
 import org.apache.atlas.repository.converters.AtlasInstanceConverter;
@@ -30,7 +32,6 @@ import org.apache.atlas.typesystem.IStruct;
 import org.apache.atlas.typesystem.ITypedReferenceableInstance;
 import org.apache.atlas.typesystem.Referenceable;
 import org.apache.atlas.typesystem.Struct;
-import org.apache.atlas.typesystem.persistence.Id;
 import org.apache.atlas.typesystem.types.AttributeInfo;
 import org.apache.atlas.typesystem.types.ClassType;
 import org.apache.atlas.typesystem.types.FieldMapping;
@@ -178,6 +179,8 @@ public class NotificationEntityChangeListener implements 
EntityChangeListener {
                                      EntityNotification.OperationType 
operationType) throws AtlasException {
         List<EntityNotification> messages = new LinkedList<>();
 
+        MetricRecorder metric = 
RequestContextV1.get().startMetricRecord("entityNotification");
+
         for (IReferenceableInstance entityDefinition : entityDefinitions) {
             if(GraphHelper.isInternalType(entityDefinition.getTypeName())) {
                 continue;
@@ -207,6 +210,8 @@ public class NotificationEntityChangeListener implements 
EntityChangeListener {
         if (!messages.isEmpty()) {
             
notificationInterface.send(NotificationInterface.NotificationType.ENTITIES, 
messages);
         }
+
+        RequestContextV1.get().endMetricRecord(metric);
     }
 
     private List<String> getNotificationAttributes(String entityType) {

http://git-wip-us.apache.org/repos/asf/atlas/blob/900f99bb/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index a26ab8b..2d2a6fb 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -93,6 +93,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
     private static final Logger PERF_LOG = 
AtlasPerfTracer.getPerfLogger(NotificationHookConsumer.class);
     private static final String LOCALHOST = "localhost";
     private static Logger FAILED_LOG = LoggerFactory.getLogger("FAILED");
+    private static Logger LARGE_MESSAGES_LOG = 
LoggerFactory.getLogger("LARGE_MESSAGES");
 
     private static final String TYPE_HIVE_COLUMN_LINEAGE = 
"hive_column_lineage";
     private static final String ATTRIBUTE_INPUTS         = "inputs";
@@ -123,6 +124,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
     private final int                    failedMsgCacheSize;
     private final boolean                skipHiveColumnLineageHive20633;
     private final int                    
skipHiveColumnLineageHive20633InputsThreshold;
+    private final int                    largeMessageProcessingTimeThresholdMs;
     private final boolean                consumerDisabled;
 
     @VisibleForTesting
@@ -157,6 +159,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
 
         skipHiveColumnLineageHive20633                = 
applicationProperties.getBoolean(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, 
true);
         skipHiveColumnLineageHive20633InputsThreshold = 
applicationProperties.getInt(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD,
 15); // skip if avg # of inputs is > 15
+        largeMessageProcessingTimeThresholdMs         = 
applicationProperties.getInt("atlas.notification.consumer.large.message.processing.time.threshold.ms",
 60 * 1000);  //  60 sec by default
         consumerDisabled                                                       
  = applicationProperties.getBoolean(CONSUMER_DISABLED, false);
 
         LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, 
skipHiveColumnLineageHive20633);
@@ -587,6 +590,15 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
                 commit(kafkaMsg);
             } finally {
                 AtlasPerfTracer.log(perf);
+
+                long msgProcessingTime = perf != null ? perf.getElapsedTime() 
: 0;
+
+                if (msgProcessingTime > largeMessageProcessingTimeThresholdMs) 
{
+                    String strMessage = 
AbstractNotification.getMessageJson(message);
+
+                    LOG.warn("msgProcessingTime={}, msgSize={}, 
topicOffset={}}", msgProcessingTime, strMessage.length(), kafkaMsg.getOffset());
+                    
LARGE_MESSAGES_LOG.warn("{\"msgProcessingTime\":{},\"msgSize\":{},\"topicOffset\":{},\"data\":{}}",
 msgProcessingTime, strMessage.length(), kafkaMsg.getOffset(), strMessage);
+                }
             }
         }
 

Reply via email to