Repository: atlas Updated Branches: refs/heads/branch-0.8 1584139c2 -> 900f99bb4
ATLAS-3002: added instrumentation to collect time taken for sub-tasks during entity create/update Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/900f99bb Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/900f99bb Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/900f99bb Branch: refs/heads/branch-0.8 Commit: 900f99bb47278fbc1ca6d7ed4f55fd715aac6ad2 Parents: 1584139 Author: Madhan Neethiraj <mad...@apache.org> Authored: Tue Dec 18 13:01:26 2018 -0800 Committer: Madhan Neethiraj <mad...@apache.org> Committed: Tue Dec 18 14:58:29 2018 -0800 ---------------------------------------------------------------------- .../hive/hook/events/CreateHiveProcess.java | 2 +- distro/src/conf/atlas-log4j.xml | 19 +++- .../apache/atlas/kafka/KafkaNotification.java | 3 + .../atlas/GraphTransactionInterceptor.java | 8 ++ .../repository/audit/EntityAuditListener.java | 25 +++++ .../converters/AtlasInstanceConverter.java | 16 +-- .../repository/graph/FullTextMapperV2.java | 91 +++++++++++----- .../graph/v1/AtlasEntityChangeNotifier.java | 10 ++ .../graph/v1/AtlasEntityGraphDiscoveryV1.java | 10 ++ .../store/graph/v1/AtlasEntityStoreV1.java | 9 ++ .../store/graph/v1/AtlasGraphUtilsV1.java | 22 +++- .../store/graph/v1/EntityGraphMapper.java | 5 + .../java/org/apache/atlas/RequestContextV1.java | 14 ++- .../java/org/apache/atlas/metrics/Metrics.java | 109 ++++++++++++++----- .../NotificationEntityChangeListener.java | 7 +- .../notification/NotificationHookConsumer.java | 12 ++ 16 files changed, 294 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/900f99bb/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java index 84f9c97..aeb980a 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java @@ -181,7 +181,7 @@ public class CreateHiveProcess extends BaseHiveEvent { AtlasEntity columnLineageProcess = new AtlasEntity(HIVE_TYPE_COLUMN_LINEAGE); - columnLineageProcess.setAttribute(ATTRIBUTE_NAME, hiveProcess.getAttribute(ATTRIBUTE_NAME) + ":" + outputColumn.getAttribute(ATTRIBUTE_NAME)); + columnLineageProcess.setAttribute(ATTRIBUTE_NAME, hiveProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + ":" + outputColumn.getAttribute(ATTRIBUTE_NAME)); columnLineageProcess.setAttribute(ATTRIBUTE_QUALIFIED_NAME, hiveProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + ":" + outputColumn.getAttribute(ATTRIBUTE_NAME)); columnLineageProcess.setAttribute(ATTRIBUTE_INPUTS, getObjectIds(inputColumns)); columnLineageProcess.setAttribute(ATTRIBUTE_OUTPUTS, Collections.singletonList(getObjectId(outputColumn))); http://git-wip-us.apache.org/repos/asf/atlas/blob/900f99bb/distro/src/conf/atlas-log4j.xml ---------------------------------------------------------------------- diff --git a/distro/src/conf/atlas-log4j.xml b/distro/src/conf/atlas-log4j.xml index a0f9629..fc9629b 100755 --- a/distro/src/conf/atlas-log4j.xml +++ b/distro/src/conf/atlas-log4j.xml @@ -37,6 +37,16 @@ </layout> </appender> + <appender name="LARGE_MESSAGES" class="org.apache.log4j.RollingFileAppender"> + <param name="File" value="{{log_dir}}/large_messages.log"/> + <param name="Append" value="true"/> + <param name="MaxFileSize" value="100MB" /> + <param name="MaxBackupIndex" value="20" /> + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="%d %m%n"/> + </layout> + </appender> + <appender name="AUDIT" class="org.apache.log4j.RollingFileAppender"> <param name="File" value="${atlas.log.dir}/audit.log"/> <param name="Append" value="true"/> @@ -60,7 +70,7 @@ <param name="File" value="${atlas.log.dir}/failed.log"/> <param name="Append" value="true"/> <layout class="org.apache.log4j.PatternLayout"> - <param name="ConversionPattern" value="%d %m"/> + <param name="ConversionPattern" value="%d %m%n"/> <param name="maxFileSize" value="100MB" /> <param name="maxBackupIndex" value="20" /> </layout> @@ -119,6 +129,11 @@ <appender-ref ref="AUDIT"/> </logger> + <logger name="LARGE_MESSAGES" additivity="false"> + <level value="warn"/> + <appender-ref ref="LARGE_MESSAGES"/> + </logger> + <logger name="METRICS" additivity="false"> <level value="debug"/> <appender-ref ref="METRICS"/> @@ -126,7 +141,7 @@ <logger name="FAILED" additivity="false"> <level value="info"/> - <appender-ref ref="AUDIT"/> + <appender-ref ref="FAILED"/> </logger> <root> http://git-wip-us.apache.org/repos/asf/atlas/blob/900f99bb/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java index 73bc315..21dae95 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java +++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java @@ -106,6 +106,9 @@ public class KafkaNotification extends AbstractNotification implements Service { properties.put("enable.auto.commit", kafkaConf.getBoolean("enable.auto.commit", oldApiCommitEnableFlag)); properties.put("session.timeout.ms", kafkaConf.getString("session.timeout.ms", "30000")); + // if no value is specified for max.poll.records, set to 1 + properties.put("max.poll.records", kafkaConf.getInt("max.poll.records", 1)); + LOG.info("<== KafkaNotification()"); } http://git-wip-us.apache.org/repos/asf/atlas/blob/900f99bb/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java index c6a4bbe..58dd36f 100644 --- a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java +++ b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import org.aopalliance.intercept.MethodInterceptor; import org.aopalliance.intercept.MethodInvocation; import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.metrics.Metrics.MetricRecorder; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.typesystem.exception.NotFoundException; import org.slf4j.Logger; @@ -54,11 +55,16 @@ public class GraphTransactionInterceptor implements MethodInterceptor { @Override public Object invoke(MethodInvocation invocation) throws Throwable { boolean isSuccess = false; + MetricRecorder metric = null; try { try { Object response = invocation.proceed(); + + metric = RequestContextV1.get().startMetricRecord("graphCommit"); + graph.commit(); + isSuccess = true; if (LOG.isDebugEnabled()) { @@ -76,6 +82,8 @@ public class GraphTransactionInterceptor implements MethodInterceptor { throw t; } } finally { + RequestContextV1.get().endMetricRecord(metric); + List<PostTransactionHook> trxHooks = postTransactionHooks.get(); if (trxHooks != null) { http://git-wip-us.apache.org/repos/asf/atlas/blob/900f99bb/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java index 2a1881b..64b7f55 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java @@ -23,6 +23,7 @@ import org.apache.atlas.EntityAuditEvent; import org.apache.atlas.EntityAuditEvent.EntityAuditAction; import org.apache.atlas.RequestContextV1; import org.apache.atlas.listener.EntityChangeListener; +import org.apache.atlas.metrics.Metrics.MetricRecorder; import org.apache.atlas.typesystem.IReferenceableInstance; import org.apache.atlas.typesystem.IStruct; import org.apache.atlas.typesystem.ITypedReferenceableInstance; @@ -58,6 +59,8 @@ public class EntityAuditListener implements EntityChangeListener { @Override public void onEntitiesAdded(Collection<ITypedReferenceableInstance> entities, boolean isImport) throws AtlasException { + MetricRecorder metric = RequestContextV1.get().startMetricRecord("entityAudit"); + List<EntityAuditEvent> events = new ArrayList<>(); for (ITypedReferenceableInstance entity : entities) { EntityAuditEvent event = createEvent(entity, isImport ? EntityAuditAction.ENTITY_IMPORT_CREATE : EntityAuditAction.ENTITY_CREATE); @@ -65,10 +68,14 @@ public class EntityAuditListener implements EntityChangeListener { } auditRepository.putEvents(events); + + RequestContextV1.get().endMetricRecord(metric); } @Override public void onEntitiesUpdated(Collection<ITypedReferenceableInstance> entities, boolean isImport) throws AtlasException { + MetricRecorder metric = RequestContextV1.get().startMetricRecord("entityAudit"); + List<EntityAuditEvent> events = new ArrayList<>(); for (ITypedReferenceableInstance entity : entities) { EntityAuditEvent event = createEvent(entity, isImport ? EntityAuditAction.ENTITY_IMPORT_UPDATE : EntityAuditAction.ENTITY_UPDATE); @@ -76,10 +83,14 @@ public class EntityAuditListener implements EntityChangeListener { } auditRepository.putEvents(events); + + RequestContextV1.get().endMetricRecord(metric); } @Override public void onTraitsAdded(ITypedReferenceableInstance entity, Collection<? extends IStruct> traits) throws AtlasException { + MetricRecorder metric = RequestContextV1.get().startMetricRecord("entityAudit"); + if (traits != null) { for (IStruct trait : traits) { EntityAuditEvent event = createEvent(entity, EntityAuditAction.TAG_ADD, @@ -88,10 +99,14 @@ public class EntityAuditListener implements EntityChangeListener { auditRepository.putEvents(event); } } + + RequestContextV1.get().endMetricRecord(metric); } @Override public void onTraitsDeleted(ITypedReferenceableInstance entity, Collection<String> traitNames) throws AtlasException { + MetricRecorder metric = RequestContextV1.get().startMetricRecord("entityAudit"); + if (traitNames != null) { for (String traitName : traitNames) { EntityAuditEvent event = createEvent(entity, EntityAuditAction.TAG_DELETE, "Deleted trait: " + traitName); @@ -99,10 +114,14 @@ public class EntityAuditListener implements EntityChangeListener { auditRepository.putEvents(event); } } + + RequestContextV1.get().endMetricRecord(metric); } @Override public void onTraitsUpdated(ITypedReferenceableInstance entity, Collection<? extends IStruct> traits) throws AtlasException { + MetricRecorder metric = RequestContextV1.get().startMetricRecord("entityAudit"); + if (traits != null) { for (IStruct trait : traits) { EntityAuditEvent event = createEvent(entity, EntityAuditAction.TAG_UPDATE, @@ -111,10 +130,14 @@ public class EntityAuditListener implements EntityChangeListener { auditRepository.putEvents(event); } } + + RequestContextV1.get().endMetricRecord(metric); } @Override public void onEntitiesDeleted(Collection<ITypedReferenceableInstance> entities, boolean isImport) throws AtlasException { + MetricRecorder metric = RequestContextV1.get().startMetricRecord("entityAudit"); + List<EntityAuditEvent> events = new ArrayList<>(); for (ITypedReferenceableInstance entity : entities) { EntityAuditEvent event = createEvent(entity, isImport ? EntityAuditAction.ENTITY_IMPORT_DELETE : EntityAuditAction.ENTITY_DELETE, "Deleted entity"); @@ -122,6 +145,8 @@ public class EntityAuditListener implements EntityChangeListener { } auditRepository.putEvents(events); + + RequestContextV1.get().endMetricRecord(metric); } public List<EntityAuditEvent> getAuditEvents(String guid) throws AtlasException{ http://git-wip-us.apache.org/repos/asf/atlas/blob/900f99bb/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java index 9bde5db..b4d399a 100644 --- a/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java +++ b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java @@ -20,6 +20,7 @@ package org.apache.atlas.repository.converters; import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasException; import org.apache.atlas.CreateUpdateEntitiesResult; +import org.apache.atlas.RequestContext; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.TypeCategory; import org.apache.atlas.model.instance.AtlasClassification; @@ -99,17 +100,18 @@ public class AtlasInstanceConverter { } public ITypedReferenceableInstance getITypedReferenceable(AtlasEntity entity) throws AtlasBaseException { - try { - return metadataService.getEntityDefinition(entity.getGuid()); - } catch (AtlasException e) { - LOG.error("Exception while getting a typed reference for the entity ", e); - throw toAtlasBaseException(e); - } + return getITypedReferenceable(entity.getGuid()); } public ITypedReferenceableInstance getITypedReferenceable(String guid) throws AtlasBaseException { try { - return metadataService.getEntityDefinition(guid); + ITypedReferenceableInstance ret = RequestContext.get().getInstanceV1(guid); + + if (ret == null) { + ret = metadataService.getEntityDefinition(guid); + } + + return ret; } catch (AtlasException e) { LOG.error("Exception while getting a typed reference for the entity ", e); throw toAtlasBaseException(e); http://git-wip-us.apache.org/repos/asf/atlas/blob/900f99bb/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapperV2.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapperV2.java b/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapperV2.java index 5b5158c..84cd999 100644 --- a/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapperV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapperV2.java @@ -26,6 +26,13 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.AtlasStruct; import org.apache.atlas.repository.store.graph.v1.EntityGraphRetriever; +import org.apache.atlas.type.AtlasArrayType; +import org.apache.atlas.type.AtlasBuiltInTypes; +import org.apache.atlas.type.AtlasClassificationType; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasStructType; +import org.apache.atlas.type.AtlasStructType.AtlasAttribute; +import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; @@ -37,6 +44,7 @@ import org.springframework.stereotype.Component; import javax.inject.Inject; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -52,17 +60,19 @@ public class FullTextMapperV2 { private static final String FULL_TEXT_FOLLOW_REFERENCES = "atlas.search.fulltext.followReferences"; private static final String FULL_TEXT_EXCLUDE_ATTRIBUTE_PROPERTY = "atlas.search.fulltext.type"; + private final AtlasTypeRegistry typeRegistry; + private final Configuration configuration; private final EntityGraphRetriever entityGraphRetriever; private final boolean followReferences; private final Map<String, Set<String>> excludeAttributesCache = new HashMap<>(); - private Configuration APPLICATION_PROPERTIES = null; @Inject public FullTextMapperV2(AtlasTypeRegistry typeRegistry, Configuration configuration) { - entityGraphRetriever = new EntityGraphRetriever(typeRegistry); - APPLICATION_PROPERTIES = configuration; - followReferences = APPLICATION_PROPERTIES != null && APPLICATION_PROPERTIES.getBoolean(FULL_TEXT_FOLLOW_REFERENCES, false); + this.typeRegistry = typeRegistry; + this.configuration = configuration; + entityGraphRetriever = new EntityGraphRetriever(typeRegistry); + followReferences = this.configuration != null && this.configuration.getBoolean(FULL_TEXT_FOLLOW_REFERENCES, false); } /** @@ -89,11 +99,12 @@ public class FullTextMapperV2 { if (CollectionUtils.isNotEmpty(classifications)) { for (AtlasClassification classification : classifications) { - sb.append(classification.getTypeName()).append(FULL_TEXT_DELIMITER); + final AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(classification.getTypeName()); + final Set<String> excludeAttributes = getExcludeAttributesForIndexText(classification.getTypeName()); - Set<String> excludeAttributes = getExcludeAttributesForIndexText(classification.getTypeName()); + sb.append(classification.getTypeName()).append(FULL_TEXT_DELIMITER); - mapAttributes(classification.getAttributes(), entityWithExtInfo, sb, new HashSet<String>(), excludeAttributes); + mapAttributes(classificationType, classification.getAttributes(), entityWithExtInfo, sb, new HashSet<String>(), excludeAttributes); } } @@ -109,12 +120,23 @@ public class FullTextMapperV2 { public String getIndexTextForEntity(String guid) throws AtlasBaseException { String ret = null; - AtlasEntity entity = getAndCacheEntity(guid); + final AtlasEntity entity; + final AtlasEntityExtInfo entityExtInfo; + + if (followReferences) { + AtlasEntityWithExtInfo entityWithExtInfo = getAndCacheEntityWithExtInfo(guid); + + entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null; + entityExtInfo = entityWithExtInfo; + } else { + entity = getAndCacheEntity(guid); + entityExtInfo = null; + } if (entity != null) { StringBuilder sb = new StringBuilder(); - map(entity, null, sb, new HashSet<String>()); + map(entity, entityExtInfo, sb, new HashSet<String>()); ret = sb.toString(); } @@ -131,27 +153,29 @@ public class FullTextMapperV2 { return; } + final AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName()); + final Set<String> excludeAttributes = getExcludeAttributesForIndexText(entity.getTypeName()); + processedGuids.add(entity.getGuid()); sb.append(entity.getTypeName()).append(FULL_TEXT_DELIMITER); - Set<String> excludeAttributes = getExcludeAttributesForIndexText(entity.getTypeName()); - - mapAttributes(entity.getAttributes(), entityExtInfo, sb, processedGuids, excludeAttributes); + mapAttributes(entityType, entity.getAttributes(), entityExtInfo, sb, processedGuids, excludeAttributes); - List<AtlasClassification> classifications = entity.getClassifications(); + final List<AtlasClassification> classifications = entity.getClassifications(); if (CollectionUtils.isNotEmpty(classifications)) { for (AtlasClassification classification : classifications) { - sb.append(classification.getTypeName()).append(FULL_TEXT_DELIMITER); + final AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(classification.getTypeName()); + final Set<String> excludeClassificationAttributes = getExcludeAttributesForIndexText(classification.getTypeName()); - Set<String> excludeClassificationAttributes = getExcludeAttributesForIndexText(classification.getTypeName()); + sb.append(classification.getTypeName()).append(FULL_TEXT_DELIMITER); - mapAttributes(classification.getAttributes(), entityExtInfo, sb, processedGuids, excludeClassificationAttributes); + mapAttributes(classificationType, classification.getAttributes(), entityExtInfo, sb, processedGuids, excludeClassificationAttributes); } } } - private void mapAttributes(Map<String, Object> attributes, AtlasEntityExtInfo entityExtInfo, StringBuilder sb, + private void mapAttributes(AtlasStructType structType, Map<String, Object> attributes, AtlasEntityExtInfo entityExtInfo, StringBuilder sb, Set<String> processedGuids, Set<String> excludeAttributes) throws AtlasBaseException { if (MapUtils.isEmpty(attributes)) { return; @@ -161,10 +185,27 @@ public class FullTextMapperV2 { String attribKey = attributeEntry.getKey(); Object attrValue = attributeEntry.getValue(); - if (attrValue == null || isExcludedAttribute(excludeAttributes, attribKey)) { + if (attrValue == null || excludeAttributes.contains(attribKey)) { continue; } + if (!followReferences) { + AtlasAttribute attribute = structType != null ? structType.getAttribute(attribKey) : null; + AtlasType attributeType = attribute != null ? attribute.getAttributeType() : null; + + if (attributeType == null) { + continue; + } + + if (attributeType instanceof AtlasArrayType) { + attributeType = ((AtlasArrayType) attributeType).getElementType(); + } + + if (attributeType instanceof AtlasEntityType || attributeType instanceof AtlasBuiltInTypes.AtlasObjectIdType) { + continue; + } + } + sb.append(attribKey).append(FULL_TEXT_DELIMITER); mapAttribute(attrValue, entityExtInfo, sb, processedGuids); @@ -247,25 +288,25 @@ public class FullTextMapperV2 { return entityWithExtInfo; } - private boolean isExcludedAttribute(Set<String> excludeAttributes, String attributeName) { - return CollectionUtils.isNotEmpty(excludeAttributes) && excludeAttributes.contains(attributeName); - } - private Set<String> getExcludeAttributesForIndexText(String typeName) { - Set<String> ret = null; + final Set<String> ret; if (excludeAttributesCache.containsKey(typeName)) { ret = excludeAttributesCache.get(typeName); - } else if (APPLICATION_PROPERTIES != null) { - String[] excludeAttributes = APPLICATION_PROPERTIES.getStringArray(FULL_TEXT_EXCLUDE_ATTRIBUTE_PROPERTY + "." + + } else if (configuration != null) { + String[] excludeAttributes = configuration.getStringArray(FULL_TEXT_EXCLUDE_ATTRIBUTE_PROPERTY + "." + typeName + "." + "attributes.exclude"); if (ArrayUtils.isNotEmpty(excludeAttributes)) { ret = new HashSet<>(Arrays.asList(excludeAttributes)); + } else { + ret = Collections.emptySet(); } excludeAttributesCache.put(typeName, ret); + } else { + ret = Collections.emptySet(); } return ret; http://git-wip-us.apache.org/repos/asf/atlas/blob/900f99bb/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java index 92d95f9..054ebd1 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java @@ -20,8 +20,10 @@ package org.apache.atlas.repository.store.graph.v1; import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasException; +import org.apache.atlas.RequestContextV1; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.listener.EntityChangeListener; +import org.apache.atlas.metrics.Metrics.MetricRecorder; import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.model.instance.EntityMutationResponse; @@ -186,6 +188,8 @@ public class AtlasEntityChangeNotifier { } private List<ITypedReferenceableInstance> toITypedReferenceable(List<AtlasEntityHeader> entityHeaders, EntityOperation operation) throws AtlasBaseException { + MetricRecorder metric = RequestContextV1.get().startMetricRecord("notification-getReferenceable"); + List<ITypedReferenceableInstance> ret = new ArrayList<>(entityHeaders.size()); // delete notifications need only entity-guid. In case of hard-delete, getITypedReferenceable() call below will @@ -200,6 +204,8 @@ public class AtlasEntityChangeNotifier { } } + RequestContextV1.get().endMetricRecord(metric); + return ret; } @@ -242,6 +248,8 @@ public class AtlasEntityChangeNotifier { LOG.warn("Unable to determine if FullText is disabled. Proceeding with FullText mapping"); } + MetricRecorder metric = RequestContextV1.get().startMetricRecord("fullTextMapping"); + for (AtlasEntityHeader atlasEntityHeader : atlasEntityHeaders) { String guid = atlasEntityHeader.getGuid(); AtlasVertex atlasVertex = AtlasGraphUtilsV1.findByGuid(guid); @@ -258,6 +266,8 @@ public class AtlasEntityChangeNotifier { LOG.error("FullText mapping failed for Vertex[ guid = {} ]", guid, e); } } + + RequestContextV1.get().endMetricRecord(metric); } private void updateFullTextMapping(String entityId, List<AtlasClassification> classifications) { http://git-wip-us.apache.org/repos/asf/atlas/blob/900f99bb/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java index 12e8bb1..66a92fa 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java @@ -18,7 +18,9 @@ package org.apache.atlas.repository.store.graph.v1; import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.RequestContextV1; import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.metrics.Metrics.MetricRecorder; import org.apache.atlas.model.TypeCategory; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasObjectId; @@ -127,6 +129,8 @@ public class AtlasEntityGraphDiscoveryV1 implements EntityGraphDiscovery { protected void discover() throws AtlasBaseException { + MetricRecorder metric = RequestContextV1.get().startMetricRecord("walkEntityGraph"); + EntityStream entityStream = discoveryContext.getEntityStream(); Set<String> walkedEntities = new HashSet<>(); @@ -162,9 +166,13 @@ public class AtlasEntityGraphDiscoveryV1 implements EntityGraphDiscovery { walkedEntities.add(entity.getGuid()); } } + + RequestContextV1.get().endMetricRecord(metric); } protected void resolveReferences() throws AtlasBaseException { + MetricRecorder metric = RequestContextV1.get().startMetricRecord("resolveReferences"); + EntityResolver[] entityResolvers = new EntityResolver[] { new IDBasedEntityResolver(typeRegistry), new UniqAttrBasedEntityResolver(typeRegistry) }; @@ -172,6 +180,8 @@ public class AtlasEntityGraphDiscoveryV1 implements EntityGraphDiscovery { for (EntityResolver resolver : entityResolvers) { resolver.resolveEntityReferences(discoveryContext); } + + RequestContextV1.get().endMetricRecord(metric); } private void visitReference(AtlasObjectIdType type, Object val) throws AtlasBaseException { http://git-wip-us.apache.org/repos/asf/atlas/blob/900f99bb/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java index f578ded..1d89c83 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java @@ -23,6 +23,7 @@ import org.apache.atlas.GraphTransactionInterceptor; import org.apache.atlas.RequestContextV1; import org.apache.atlas.annotation.GraphTransaction; import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.metrics.Metrics.MetricRecorder; import org.apache.atlas.model.instance.AtlasCheckStateRequest; import org.apache.atlas.model.instance.AtlasCheckStateResult; import org.apache.atlas.model.instance.AtlasClassification; @@ -218,6 +219,8 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update."); } + MetricRecorder metric = RequestContextV1.get().startMetricRecord("createOrUpdate"); + // Create/Update entities EntityMutationContext context = preCreateOrUpdate(entityStream, entityGraphMapper, isPartialUpdate); @@ -232,6 +235,8 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { // Notify the change listeners entityChangeNotifier.onEntitiesMutated(ret, entityStream instanceof EntityImportStream); + RequestContextV1.get().endMetricRecord(metric); + return ret; } @@ -606,6 +611,8 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { } private EntityMutationContext preCreateOrUpdate(EntityStream entityStream, EntityGraphMapper entityGraphMapper, boolean isPartialUpdate) throws AtlasBaseException { + MetricRecorder metric = RequestContextV1.get().startMetricRecord("preCreateOrUpdate"); + EntityGraphDiscovery graphDiscoverer = new AtlasEntityGraphDiscoveryV1(typeRegistry, entityStream); EntityGraphDiscoveryContext discoveryContext = graphDiscoverer.discoverEntities(); EntityMutationContext context = new EntityMutationContext(discoveryContext); @@ -668,6 +675,8 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { } } + RequestContextV1.get().endMetricRecord(metric); + return context; } http://git-wip-us.apache.org/repos/asf/atlas/blob/900f99bb/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java index 65c5d41..03d94ce 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java @@ -20,8 +20,10 @@ package org.apache.atlas.repository.store.graph.v1; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.RequestContextV1; import org.apache.atlas.discovery.SearchProcessor; import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.metrics.Metrics.MetricRecorder; import org.apache.atlas.model.TypeCategory; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.typedef.AtlasBaseTypeDef; @@ -245,6 +247,8 @@ public class AtlasGraphUtilsV1 { } public static AtlasVertex findByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> attrValues) { + MetricRecorder metric = RequestContextV1.get().startMetricRecord("findByUniqueAttributes"); + AtlasVertex vertex = null; final Map<String, AtlasAttribute> uniqueAttributes = entityType.getUniqAttributes(); @@ -278,6 +282,8 @@ public class AtlasGraphUtilsV1 { } } + RequestContextV1.get().endMetricRecord(metric); + return vertex; } @@ -309,28 +315,36 @@ public class AtlasGraphUtilsV1 { } public static AtlasVertex findByTypeAndPropertyName(String typeName, String propertyName, Object attrVal) { + MetricRecorder metric = RequestContextV1.get().startMetricRecord("findByTypeAndPropertyName"); + AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query() .has(Constants.ENTITY_TYPE_PROPERTY_KEY, typeName) - .has(Constants.STATE_PROPERTY_KEY, AtlasEntity.Status.ACTIVE.name()) - .has(propertyName, attrVal); + .has(propertyName, attrVal) + .has(Constants.STATE_PROPERTY_KEY, AtlasEntity.Status.ACTIVE.name()); Iterator<AtlasVertex> results = query.vertices().iterator(); AtlasVertex vertex = results.hasNext() ? results.next() : null; + RequestContextV1.get().endMetricRecord(metric); + return vertex; } public static AtlasVertex findBySuperTypeAndPropertyName(String typeName, String propertyName, Object attrVal) { + MetricRecorder metric = RequestContextV1.get().startMetricRecord("findBySuperTypeAndPropertyName"); + AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query() .has(Constants.SUPER_TYPES_PROPERTY_KEY, typeName) - .has(Constants.STATE_PROPERTY_KEY, AtlasEntity.Status.ACTIVE.name()) - .has(propertyName, attrVal); + .has(propertyName, attrVal) + .has(Constants.STATE_PROPERTY_KEY, AtlasEntity.Status.ACTIVE.name()); Iterator<AtlasVertex> results = query.vertices().iterator(); AtlasVertex vertex = results.hasNext() ? results.next() : null; + RequestContextV1.get().endMetricRecord(metric); + return vertex; } http://git-wip-us.apache.org/repos/asf/atlas/blob/900f99bb/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java index e818ed5..a9bf55a 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java @@ -24,6 +24,7 @@ import org.apache.atlas.AtlasException; import org.apache.atlas.RequestContext; import org.apache.atlas.RequestContextV1; import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.metrics.Metrics.MetricRecorder; import org.apache.atlas.model.TypeCategory; import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasEntity; @@ -135,6 +136,8 @@ public class EntityGraphMapper { } public EntityMutationResponse mapAttributesAndClassifications(EntityMutationContext context, final boolean isPartialUpdate, final boolean replaceClassifications) throws AtlasBaseException { + MetricRecorder metric = RequestContextV1.get().startMetricRecord("mapAttributesAndClassifications"); + EntityMutationResponse resp = new EntityMutationResponse(); Collection<AtlasEntity> createdEntities = context.getCreatedEntities(); @@ -189,6 +192,8 @@ public class EntityGraphMapper { } } + RequestContextV1.get().endMetricRecord(metric); + return resp; } http://git-wip-us.apache.org/repos/asf/atlas/blob/900f99bb/server-api/src/main/java/org/apache/atlas/RequestContextV1.java ---------------------------------------------------------------------- diff --git a/server-api/src/main/java/org/apache/atlas/RequestContextV1.java b/server-api/src/main/java/org/apache/atlas/RequestContextV1.java index b85769c..5ae8055 100644 --- a/server-api/src/main/java/org/apache/atlas/RequestContextV1.java +++ b/server-api/src/main/java/org/apache/atlas/RequestContextV1.java @@ -19,6 +19,7 @@ package org.apache.atlas; import org.apache.atlas.metrics.Metrics; +import org.apache.atlas.metrics.Metrics.MetricRecorder; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.store.DeleteType; @@ -34,7 +35,7 @@ import java.util.Map; import java.util.Set; public class RequestContextV1 { - private static final Logger LOG = LoggerFactory.getLogger(RequestContextV1.class); + private static final Logger METRICS = LoggerFactory.getLogger("METRICS"); private static final ThreadLocal<RequestContextV1> CURRENT_CONTEXT = new ThreadLocal<>(); private static final Set<RequestContextV1> ACTIVE_REQUESTS = new HashSet<>(); @@ -91,6 +92,12 @@ public class RequestContextV1 { if (this.entityGuidInRequest != null) { this.entityGuidInRequest.clear(); } + + if (METRICS.isDebugEnabled() && !metrics.isEmpty()) { + METRICS.debug(metrics.toString()); + } + + metrics.clear(); } public static int getActiveRequestsCount() { @@ -181,11 +188,14 @@ public class RequestContextV1 { return deletedEntities.containsKey(guid); } + public MetricRecorder startMetricRecord(String name) { return metrics.getMetricRecorder(name); } + + public void endMetricRecord(MetricRecorder recorder) { metrics.recordMetric(recorder); } + public static Metrics getMetrics() { return get().metrics; } - public void recordEntityGuidUpdate(AtlasEntity entity, String guidInRequest) { if (entityGuidInRequest == null) { entityGuidInRequest = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/atlas/blob/900f99bb/server-api/src/main/java/org/apache/atlas/metrics/Metrics.java ---------------------------------------------------------------------- diff --git a/server-api/src/main/java/org/apache/atlas/metrics/Metrics.java b/server-api/src/main/java/org/apache/atlas/metrics/Metrics.java index e0f4e49..6bb8c2a 100644 --- a/server-api/src/main/java/org/apache/atlas/metrics/Metrics.java +++ b/server-api/src/main/java/org/apache/atlas/metrics/Metrics.java @@ -18,51 +18,108 @@ package org.apache.atlas.metrics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.LinkedHashMap; import java.util.Map; +import java.util.Set; public class Metrics { - public static class Counters { - private short invocations = 0; - private long totalTimeMSecs = 0; + private static final Logger METRICS = LoggerFactory.getLogger("METRICS"); - @Override - public String toString() { - return "[count=" + invocations + ", totalTimeMSec=" + totalTimeMSecs + "]"; - } + private final Map<String, Metric> metrics = new LinkedHashMap<>(); - public short getInvocations() { - return invocations; - } - public long getTotalTimeMSecs() { - return totalTimeMSecs; - } + public MetricRecorder getMetricRecorder(String name) { + return METRICS.isDebugEnabled() ? new MetricRecorder(name) : null; } - Map<String, Counters> countersMap = new LinkedHashMap<>(); + public void recordMetric(MetricRecorder recorder) { + if (recorder != null) { + final String name = recorder.name; + final long timeTaken = recorder.getElapsedTime(); + + Metric metric = metrics.get(name); + + if (metric == null) { + metric = new Metric(name); + + metrics.put(name, metric); + } - public void record(String name, long timeMsecs) { - Counters counter = countersMap.get(name); - if (counter == null) { - counter = new Counters(); - countersMap.put(name, counter); + metric.invocations++; + metric.totalTimeMSecs += timeTaken; } + } + + public void clear() { + metrics.clear(); + } + + public boolean isEmpty() { + return metrics.isEmpty(); + } + + public Set<String> getMetricsNames() { + return metrics.keySet(); + } - counter.invocations++; - counter.totalTimeMSecs += timeMsecs; + public Metric getMetric(String name) { + return metrics.get(name); } @Override public String toString() { - return countersMap.toString(); + StringBuilder sb = new StringBuilder(); + + sb.append("{"); + + if (!metrics.isEmpty()) { + for (Metric metric : metrics.values()) { + sb.append("\"").append(metric.getName()).append("\":{\"count\":").append(metric.getInvocations()).append(",\"timeTaken\":").append(metric.getTotalTimeMSecs()).append("},"); + } + + sb.setLength(sb.length() - 1); // remove last "," + } + + sb.append("}"); + + return sb.toString(); } - public boolean isEmpty() { - return countersMap.isEmpty(); + public class MetricRecorder { + private final String name; + private final long startTimeMs = System.currentTimeMillis(); + + MetricRecorder(String name) { + this.name = name; + } + + long getElapsedTime() { + return System.currentTimeMillis() - startTimeMs; + } } - public Counters getCounters(String name) { - return countersMap.get(name); + public static class Metric { + private final String name; + private short invocations = 0; + private long totalTimeMSecs = 0; + + public Metric(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public short getInvocations() { + return invocations; + } + + public long getTotalTimeMSecs() { + return totalTimeMSecs; + } } } http://git-wip-us.apache.org/repos/asf/atlas/blob/900f99bb/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java index 926acfb..02bfccc 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java @@ -20,7 +20,9 @@ package org.apache.atlas.notification; import com.google.common.annotations.VisibleForTesting; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasException; +import org.apache.atlas.RequestContextV1; import org.apache.atlas.listener.EntityChangeListener; +import org.apache.atlas.metrics.Metrics.MetricRecorder; import org.apache.atlas.notification.entity.EntityNotification; import org.apache.atlas.notification.entity.EntityNotificationImpl; import org.apache.atlas.repository.converters.AtlasInstanceConverter; @@ -30,7 +32,6 @@ import org.apache.atlas.typesystem.IStruct; import org.apache.atlas.typesystem.ITypedReferenceableInstance; import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Struct; -import org.apache.atlas.typesystem.persistence.Id; import org.apache.atlas.typesystem.types.AttributeInfo; import org.apache.atlas.typesystem.types.ClassType; import org.apache.atlas.typesystem.types.FieldMapping; @@ -178,6 +179,8 @@ public class NotificationEntityChangeListener implements EntityChangeListener { EntityNotification.OperationType operationType) throws AtlasException { List<EntityNotification> messages = new LinkedList<>(); + MetricRecorder metric = RequestContextV1.get().startMetricRecord("entityNotification"); + for (IReferenceableInstance entityDefinition : entityDefinitions) { if(GraphHelper.isInternalType(entityDefinition.getTypeName())) { continue; @@ -207,6 +210,8 @@ public class NotificationEntityChangeListener implements EntityChangeListener { if (!messages.isEmpty()) { notificationInterface.send(NotificationInterface.NotificationType.ENTITIES, messages); } + + RequestContextV1.get().endMetricRecord(metric); } private List<String> getNotificationAttributes(String entityType) { http://git-wip-us.apache.org/repos/asf/atlas/blob/900f99bb/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index a26ab8b..2d2a6fb 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -93,6 +93,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger(NotificationHookConsumer.class); private static final String LOCALHOST = "localhost"; private static Logger FAILED_LOG = LoggerFactory.getLogger("FAILED"); + private static Logger LARGE_MESSAGES_LOG = LoggerFactory.getLogger("LARGE_MESSAGES"); private static final String TYPE_HIVE_COLUMN_LINEAGE = "hive_column_lineage"; private static final String ATTRIBUTE_INPUTS = "inputs"; @@ -123,6 +124,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private final int failedMsgCacheSize; private final boolean skipHiveColumnLineageHive20633; private final int skipHiveColumnLineageHive20633InputsThreshold; + private final int largeMessageProcessingTimeThresholdMs; private final boolean consumerDisabled; @VisibleForTesting @@ -157,6 +159,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl skipHiveColumnLineageHive20633 = applicationProperties.getBoolean(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, true); skipHiveColumnLineageHive20633InputsThreshold = applicationProperties.getInt(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 15); // skip if avg # of inputs is > 15 + largeMessageProcessingTimeThresholdMs = applicationProperties.getInt("atlas.notification.consumer.large.message.processing.time.threshold.ms", 60 * 1000); // 60 sec by default consumerDisabled = applicationProperties.getBoolean(CONSUMER_DISABLED, false); LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, skipHiveColumnLineageHive20633); @@ -587,6 +590,15 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl commit(kafkaMsg); } finally { AtlasPerfTracer.log(perf); + + long msgProcessingTime = perf != null ? perf.getElapsedTime() : 0; + + if (msgProcessingTime > largeMessageProcessingTimeThresholdMs) { + String strMessage = AbstractNotification.getMessageJson(message); + + LOG.warn("msgProcessingTime={}, msgSize={}, topicOffset={}}", msgProcessingTime, strMessage.length(), kafkaMsg.getOffset()); + LARGE_MESSAGES_LOG.warn("{\"msgProcessingTime\":{},\"msgSize\":{},\"topicOffset\":{},\"data\":{}}", msgProcessingTime, strMessage.length(), kafkaMsg.getOffset(), strMessage); + } } }