This is an automated email from the ASF dual-hosted git repository.

madhan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new 2369e7c8e ATLAS-4881: minor improvements in notification processing
2369e7c8e is described below

commit 2369e7c8e0ad63a6dc173713823cdca9bf3b9b35
Author: Madhan Neethiraj <mad...@apache.org>
AuthorDate: Tue Jun 18 00:18:19 2024 -0700

    ATLAS-4881: minor improvements in notification processing
---
 .../org/apache/atlas/utils/AtlasPerfMetrics.java   |  9 ++++-
 .../notification/NotificationHookConsumer.java     | 41 ++++++++++++++++------
 .../preprocessor/HiveDbDDLPreprocessor.java        |  5 ++-
 .../preprocessor/HivePreprocessor.java             |  4 ++-
 .../preprocessor/HiveTableDDLPreprocessor.java     |  5 ++-
 .../preprocessor/PreprocessorContext.java          | 10 +++---
 6 files changed, 56 insertions(+), 18 deletions(-)

diff --git a/common/src/main/java/org/apache/atlas/utils/AtlasPerfMetrics.java 
b/common/src/main/java/org/apache/atlas/utils/AtlasPerfMetrics.java
index c72b2c3e2..a65dc8606 100644
--- a/common/src/main/java/org/apache/atlas/utils/AtlasPerfMetrics.java
+++ b/common/src/main/java/org/apache/atlas/utils/AtlasPerfMetrics.java
@@ -25,6 +25,7 @@ import java.util.Set;
 
 public class AtlasPerfMetrics {
     private final Map<String, Metric> metrics = new LinkedHashMap<>();
+    private long startTimeMs = -1;
 
 
     public MetricRecorder getMetricRecorder(String name) {
@@ -36,6 +37,10 @@ public class AtlasPerfMetrics {
             final String name      = recorder.name;
             final long   timeTaken = recorder.getElapsedTime();
 
+            if (startTimeMs == -1) {
+                startTimeMs = System.currentTimeMillis();
+            }
+
             Metric metric = metrics.get(name);
 
             if (metric == null) {
@@ -51,6 +56,8 @@ public class AtlasPerfMetrics {
 
     public void clear() {
         metrics.clear();
+
+        startTimeMs = -1;
     }
 
     public boolean isEmpty() {
@@ -76,7 +83,7 @@ public class AtlasPerfMetrics {
                 
sb.append("\"").append(metric.getName()).append("\":{\"count\":").append(metric.getInvocations()).append(",\"timeTaken\":").append(metric.getTotalTimeMSecs()).append("},");
             }
 
-            sb.setLength(sb.length() - 1); // remove last ","
+            sb.append("\"totalTime\":").append(System.currentTimeMillis() - 
startTimeMs);
         }
 
         sb.append("}");
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 7b02ac449..86535275b 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -84,6 +84,7 @@ import 
org.springframework.security.core.userdetails.UserDetails;
 import org.springframework.stereotype.Component;
 
 import javax.inject.Inject;
+import javax.ws.rs.core.Response;
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -932,18 +933,34 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
                         RequestContext.get().resetEntityGuidUpdates();
                         exceptionClassName = e.getClass().getSimpleName();
 
-                        if (numRetries == (maxRetries - 1)) {
-                            String strMessage = 
AbstractNotification.getMessageJson(message);
+                        // don't retry in following conditions:
+                        //  1. number of retry attempts reached configured 
count
+                        //  2. notification processing failed due to invalid 
data (non-existing type, entity, ..)
+                        boolean        maxRetriesReached    = numRetries == 
(maxRetries - 1);
+                        AtlasErrorCode errorCode            = (e instanceof 
AtlasBaseException) ? ((AtlasBaseException) e).getAtlasErrorCode() : null;
+                        boolean        unrecoverableFailure = errorCode != 
null && (Response.Status.NOT_FOUND.equals(errorCode.getHttpCode()) || 
Response.Status.BAD_REQUEST.equals(errorCode.getHttpCode()));
 
-                            LOG.warn("Max retries exceeded for message {}", 
strMessage, e);
+                        if (maxRetriesReached || unrecoverableFailure) {
+                            try {
+                                String strMessage = 
AbstractNotification.getMessageJson(message);
+
+                                if (unrecoverableFailure) {
+                                    LOG.warn("Unrecoverable failure while 
processing message {}", strMessage, e);
+                                } else {
+                                    LOG.warn("Max retries exceeded for message 
{}", strMessage, e);
+                                }
 
-                            stats.isFailedMsg = true;
+                                stats.isFailedMsg = true;
 
-                            failedMessages.add(strMessage);
+                                failedMessages.add(strMessage);
 
-                            if (failedMessages.size() >= failedMsgCacheSize) {
-                                recordFailedMessages();
+                                if (failedMessages.size() >= 
failedMsgCacheSize) {
+                                    recordFailedMessages();
+                                }
+                            } catch (Throwable t) {
+                                LOG.warn("error while recording failed 
message: type={}, topic={}, partition={}, offset={}", message.getType(), 
kafkaMsg.getTopic(), kafkaMsg.getPartition(), kafkaMsg.getOffset(), t);
                             }
+
                             return;
                         } else if (e instanceof 
org.apache.atlas.repository.graphdb.AtlasSchemaViolationException) {
                             LOG.warn("{}: Continuing: {}", exceptionClassName, 
e.getMessage());
@@ -978,10 +995,14 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
                 
metricsUtil.onNotificationProcessingComplete(kafkaMsg.getTopic(), 
kafkaMsg.getPartition(), kafkaMsg.getOffset(), stats);
 
                 if (stats.timeTakenMs > largeMessageProcessingTimeThresholdMs) 
{
-                    String strMessage = 
AbstractNotification.getMessageJson(message);
+                    try {
+                        String strMessage = 
AbstractNotification.getMessageJson(message);
 
-                    LOG.warn("msgProcessingTime={}, msgSize={}, 
topicOffset={}}", stats.timeTakenMs, strMessage.length(), kafkaMsg.getOffset());
-                    
LARGE_MESSAGES_LOG.warn("{\"msgProcessingTime\":{},\"msgSize\":{},\"topicOffset\":{},\"data\":{}}",
 stats.timeTakenMs, strMessage.length(), kafkaMsg.getOffset(), strMessage);
+                        LOG.warn("msgProcessingTime={}, msgSize={}, 
topicOffset={}}", stats.timeTakenMs, strMessage.length(), kafkaMsg.getOffset());
+                        
LARGE_MESSAGES_LOG.warn("{\"msgProcessingTime\":{},\"msgSize\":{},\"topicOffset\":{},\"data\":{}}",
 stats.timeTakenMs, strMessage.length(), kafkaMsg.getOffset(), strMessage);
+                    } catch (Throwable t) {
+                        LOG.warn("error while recording large message: 
msgProcessingTime={}, type={}, topic={}, partition={}, offset={}", 
stats.timeTakenMs, message.getType(), kafkaMsg.getTopic(), 
kafkaMsg.getPartition(), kafkaMsg.getOffset(), t);
+                    }
                 }
 
                 if (auditLog != null) {
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveDbDDLPreprocessor.java
 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveDbDDLPreprocessor.java
index dcff0939d..a59590ca4 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveDbDDLPreprocessor.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveDbDDLPreprocessor.java
@@ -47,6 +47,9 @@ public class HiveDbDDLPreprocessor extends EntityPreprocessor 
{
         }
 
         setObjectIdWithGuid(dbObject, guid);
-        LOG.info("{}: Preprocessor: Updated: {} -> {}", getTypeName(), 
qualifiedName, guid);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("{}: Preprocessor: Updated: {} -> {}", getTypeName(), 
qualifiedName, guid);
+        }
     }
 }
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
index 083e343b0..d2aab310e 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
@@ -173,7 +173,9 @@ public class HivePreprocessor {
                 Object qualifiedName = 
entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
 
                 if (!Objects.equals(name, qualifiedName)) {
-                    LOG.info("setting {}.name={}. topic-offset={}, 
partition={}", entity.getTypeName(), qualifiedName, 
context.getKafkaMessageOffset(), context.getKafkaPartition());
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("setting {}.name={}. topic-offset={}, 
partition={}", entity.getTypeName(), qualifiedName, 
context.getKafkaMessageOffset(), context.getKafkaPartition());
+                    }
 
                     entity.setAttribute(ATTRIBUTE_NAME, qualifiedName);
                 }
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveTableDDLPreprocessor.java
 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveTableDDLPreprocessor.java
index 83d4d7c1a..383543457 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveTableDDLPreprocessor.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveTableDDLPreprocessor.java
@@ -47,6 +47,9 @@ public class HiveTableDDLPreprocessor extends 
EntityPreprocessor {
         }
 
         setObjectIdWithGuid(tableObject, guid);
-        LOG.info("{}: Preprocessor: Updated: {} -> {}", getTypeName(), 
qualifiedName, guid);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("{}: Preprocessor: Updated: {} -> {}", getTypeName(), 
qualifiedName, guid);
+        }
     }
 }
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
index f930d9f35..d25c32bfb 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
@@ -365,10 +365,12 @@ public class PreprocessorContext {
                 }
             }
 
-            if (firstEntity != null) {
-                LOG.info("moved {} referred-entities to end of entities-list 
(firstEntity:typeName={}, qualifiedName={}). topic-offset={}, partition={}", 
referredEntitiesToMove.size(), firstEntity.getTypeName(), 
EntityPreprocessor.getQualifiedName(firstEntity), kafkaMessage.getOffset(), 
kafkaMessage.getPartition());
-            } else {
-                LOG.info("moved {} referred-entities to entities-list. 
topic-offset={}, partition={}", referredEntitiesToMove.size(), 
kafkaMessage.getOffset(), kafkaMessage.getPartition());
+            if (LOG.isDebugEnabled()) {
+                if (firstEntity != null) {
+                    LOG.debug("moved {} referred-entities to end of 
entities-list (firstEntity:typeName={}, qualifiedName={}). topic-offset={}, 
partition={}", referredEntitiesToMove.size(), firstEntity.getTypeName(), 
EntityPreprocessor.getQualifiedName(firstEntity), kafkaMessage.getOffset(), 
kafkaMessage.getPartition());
+                } else {
+                    LOG.debug("moved {} referred-entities to entities-list. 
topic-offset={}, partition={}", referredEntitiesToMove.size(), 
kafkaMessage.getOffset(), kafkaMessage.getPartition());
+                }
             }
 
             referredEntitiesToMove.clear();

Reply via email to