This is an automated email from the ASF dual-hosted git repository. madhan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push: new 2369e7c8e ATLAS-4881: minor improvements in notification processing 2369e7c8e is described below commit 2369e7c8e0ad63a6dc173713823cdca9bf3b9b35 Author: Madhan Neethiraj <mad...@apache.org> AuthorDate: Tue Jun 18 00:18:19 2024 -0700 ATLAS-4881: minor improvements in notification processing --- .../org/apache/atlas/utils/AtlasPerfMetrics.java | 9 ++++- .../notification/NotificationHookConsumer.java | 41 ++++++++++++++++------ .../preprocessor/HiveDbDDLPreprocessor.java | 5 ++- .../preprocessor/HivePreprocessor.java | 4 ++- .../preprocessor/HiveTableDDLPreprocessor.java | 5 ++- .../preprocessor/PreprocessorContext.java | 10 +++--- 6 files changed, 56 insertions(+), 18 deletions(-) diff --git a/common/src/main/java/org/apache/atlas/utils/AtlasPerfMetrics.java b/common/src/main/java/org/apache/atlas/utils/AtlasPerfMetrics.java index c72b2c3e2..a65dc8606 100644 --- a/common/src/main/java/org/apache/atlas/utils/AtlasPerfMetrics.java +++ b/common/src/main/java/org/apache/atlas/utils/AtlasPerfMetrics.java @@ -25,6 +25,7 @@ import java.util.Set; public class AtlasPerfMetrics { private final Map<String, Metric> metrics = new LinkedHashMap<>(); + private long startTimeMs = -1; public MetricRecorder getMetricRecorder(String name) { @@ -36,6 +37,10 @@ public class AtlasPerfMetrics { final String name = recorder.name; final long timeTaken = recorder.getElapsedTime(); + if (startTimeMs == -1) { + startTimeMs = System.currentTimeMillis(); + } + Metric metric = metrics.get(name); if (metric == null) { @@ -51,6 +56,8 @@ public class AtlasPerfMetrics { public void clear() { metrics.clear(); + + startTimeMs = -1; } public boolean isEmpty() { @@ -76,7 +83,7 @@ public class AtlasPerfMetrics { sb.append("\"").append(metric.getName()).append("\":{\"count\":").append(metric.getInvocations()).append(",\"timeTaken\":").append(metric.getTotalTimeMSecs()).append("},"); } - sb.setLength(sb.length() - 1); // remove last "," + sb.append("\"totalTime\":").append(System.currentTimeMillis() - startTimeMs); } sb.append("}"); diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 7b02ac449..86535275b 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -84,6 +84,7 @@ import org.springframework.security.core.userdetails.UserDetails; import org.springframework.stereotype.Component; import javax.inject.Inject; +import javax.ws.rs.core.Response; import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; @@ -932,18 +933,34 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl RequestContext.get().resetEntityGuidUpdates(); exceptionClassName = e.getClass().getSimpleName(); - if (numRetries == (maxRetries - 1)) { - String strMessage = AbstractNotification.getMessageJson(message); + // don't retry in following conditions: + // 1. number of retry attempts reached configured count + // 2. notification processing failed due to invalid data (non-existing type, entity, ..) + boolean maxRetriesReached = numRetries == (maxRetries - 1); + AtlasErrorCode errorCode = (e instanceof AtlasBaseException) ? ((AtlasBaseException) e).getAtlasErrorCode() : null; + boolean unrecoverableFailure = errorCode != null && (Response.Status.NOT_FOUND.equals(errorCode.getHttpCode()) || Response.Status.BAD_REQUEST.equals(errorCode.getHttpCode())); - LOG.warn("Max retries exceeded for message {}", strMessage, e); + if (maxRetriesReached || unrecoverableFailure) { + try { + String strMessage = AbstractNotification.getMessageJson(message); + + if (unrecoverableFailure) { + LOG.warn("Unrecoverable failure while processing message {}", strMessage, e); + } else { + LOG.warn("Max retries exceeded for message {}", strMessage, e); + } - stats.isFailedMsg = true; + stats.isFailedMsg = true; - failedMessages.add(strMessage); + failedMessages.add(strMessage); - if (failedMessages.size() >= failedMsgCacheSize) { - recordFailedMessages(); + if (failedMessages.size() >= failedMsgCacheSize) { + recordFailedMessages(); + } + } catch (Throwable t) { + LOG.warn("error while recording failed message: type={}, topic={}, partition={}, offset={}", message.getType(), kafkaMsg.getTopic(), kafkaMsg.getPartition(), kafkaMsg.getOffset(), t); } + return; } else if (e instanceof org.apache.atlas.repository.graphdb.AtlasSchemaViolationException) { LOG.warn("{}: Continuing: {}", exceptionClassName, e.getMessage()); @@ -978,10 +995,14 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl metricsUtil.onNotificationProcessingComplete(kafkaMsg.getTopic(), kafkaMsg.getPartition(), kafkaMsg.getOffset(), stats); if (stats.timeTakenMs > largeMessageProcessingTimeThresholdMs) { - String strMessage = AbstractNotification.getMessageJson(message); + try { + String strMessage = AbstractNotification.getMessageJson(message); - LOG.warn("msgProcessingTime={}, msgSize={}, topicOffset={}}", stats.timeTakenMs, strMessage.length(), kafkaMsg.getOffset()); - LARGE_MESSAGES_LOG.warn("{\"msgProcessingTime\":{},\"msgSize\":{},\"topicOffset\":{},\"data\":{}}", stats.timeTakenMs, strMessage.length(), kafkaMsg.getOffset(), strMessage); + LOG.warn("msgProcessingTime={}, msgSize={}, topicOffset={}}", stats.timeTakenMs, strMessage.length(), kafkaMsg.getOffset()); + LARGE_MESSAGES_LOG.warn("{\"msgProcessingTime\":{},\"msgSize\":{},\"topicOffset\":{},\"data\":{}}", stats.timeTakenMs, strMessage.length(), kafkaMsg.getOffset(), strMessage); + } catch (Throwable t) { + LOG.warn("error while recording large message: msgProcessingTime={}, type={}, topic={}, partition={}, offset={}", stats.timeTakenMs, message.getType(), kafkaMsg.getTopic(), kafkaMsg.getPartition(), kafkaMsg.getOffset(), t); + } } if (auditLog != null) { diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveDbDDLPreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveDbDDLPreprocessor.java index dcff0939d..a59590ca4 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveDbDDLPreprocessor.java +++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveDbDDLPreprocessor.java @@ -47,6 +47,9 @@ public class HiveDbDDLPreprocessor extends EntityPreprocessor { } setObjectIdWithGuid(dbObject, guid); - LOG.info("{}: Preprocessor: Updated: {} -> {}", getTypeName(), qualifiedName, guid); + + if (LOG.isDebugEnabled()) { + LOG.debug("{}: Preprocessor: Updated: {} -> {}", getTypeName(), qualifiedName, guid); + } } } diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java index 083e343b0..d2aab310e 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java +++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java @@ -173,7 +173,9 @@ public class HivePreprocessor { Object qualifiedName = entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME); if (!Objects.equals(name, qualifiedName)) { - LOG.info("setting {}.name={}. topic-offset={}, partition={}", entity.getTypeName(), qualifiedName, context.getKafkaMessageOffset(), context.getKafkaPartition()); + if (LOG.isDebugEnabled()) { + LOG.debug("setting {}.name={}. topic-offset={}, partition={}", entity.getTypeName(), qualifiedName, context.getKafkaMessageOffset(), context.getKafkaPartition()); + } entity.setAttribute(ATTRIBUTE_NAME, qualifiedName); } diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveTableDDLPreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveTableDDLPreprocessor.java index 83d4d7c1a..383543457 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveTableDDLPreprocessor.java +++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveTableDDLPreprocessor.java @@ -47,6 +47,9 @@ public class HiveTableDDLPreprocessor extends EntityPreprocessor { } setObjectIdWithGuid(tableObject, guid); - LOG.info("{}: Preprocessor: Updated: {} -> {}", getTypeName(), qualifiedName, guid); + + if (LOG.isDebugEnabled()) { + LOG.debug("{}: Preprocessor: Updated: {} -> {}", getTypeName(), qualifiedName, guid); + } } } diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java index f930d9f35..d25c32bfb 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java +++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java @@ -365,10 +365,12 @@ public class PreprocessorContext { } } - if (firstEntity != null) { - LOG.info("moved {} referred-entities to end of entities-list (firstEntity:typeName={}, qualifiedName={}). topic-offset={}, partition={}", referredEntitiesToMove.size(), firstEntity.getTypeName(), EntityPreprocessor.getQualifiedName(firstEntity), kafkaMessage.getOffset(), kafkaMessage.getPartition()); - } else { - LOG.info("moved {} referred-entities to entities-list. topic-offset={}, partition={}", referredEntitiesToMove.size(), kafkaMessage.getOffset(), kafkaMessage.getPartition()); + if (LOG.isDebugEnabled()) { + if (firstEntity != null) { + LOG.debug("moved {} referred-entities to end of entities-list (firstEntity:typeName={}, qualifiedName={}). topic-offset={}, partition={}", referredEntitiesToMove.size(), firstEntity.getTypeName(), EntityPreprocessor.getQualifiedName(firstEntity), kafkaMessage.getOffset(), kafkaMessage.getPartition()); + } else { + LOG.debug("moved {} referred-entities to entities-list. topic-offset={}, partition={}", referredEntitiesToMove.size(), kafkaMessage.getOffset(), kafkaMessage.getPartition()); + } } referredEntitiesToMove.clear();