This is an automated email from the ASF dual-hosted git repository.

abhay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ranger.git


The following commit(s) were added to refs/heads/master by this push:
     new e8a6125ba RANGER-4130: Improve performance of event processing in 
agsync by optimizing number of commits to Kafka broker
e8a6125ba is described below

commit e8a6125ba99b5ca4f62923552ddb251ee476cfdd
Author: Abhay Kulkarni <ab...@apache.org>
AuthorDate: Tue Apr 18 18:07:32 2023 -0700

    RANGER-4130: Improve performance of event processing in agsync by 
optimizing number of commits to Kafka broker
---
 .../tagsync/source/atlas/AtlasTagSource.java       | 68 ++++++++++------------
 1 file changed, 32 insertions(+), 36 deletions(-)

diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
index a618cc986..34a39f73c 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
@@ -52,9 +52,9 @@ public class AtlasTagSource extends AbstractTagSource {
 
        public static final String TAGSYNC_ATLAS_PROPERTIES_FILE_NAME = 
"atlas-application.properties";
 
-       public static final String TAGSYNC_ATLAS_KAFKA_ENDPOINTS = 
"atlas.kafka.bootstrap.servers";
-       public static final String TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT = 
"atlas.kafka.zookeeper.connect";
-       public static final String TAGSYNC_ATLAS_CONSUMER_GROUP = 
"atlas.kafka.entities.group.id";
+       public static final String TAGSYNC_ATLAS_KAFKA_ENDPOINTS      = 
"atlas.kafka.bootstrap.servers";
+       public static final String TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT   = 
"atlas.kafka.zookeeper.connect";
+       public static final String TAGSYNC_ATLAS_CONSUMER_GROUP       = 
"atlas.kafka.entities.group.id";
 
        public static final int    MAX_WAIT_TIME_IN_MILLIS = 1000;
 
@@ -168,11 +168,10 @@ public class AtlasTagSource extends AbstractTagSource {
 
                private final List<RangerAtlasEntityWithTags>             
atlasEntitiesWithTags = new ArrayList<>();
                private final List<AtlasKafkaMessage<EntityNotification>> 
messages              = new ArrayList<>();
+               private       AtlasKafkaMessage<EntityNotification>       
lastUnhandledMessage  = null;
 
-               private long    offsetOfLastMessageDeliveredToRanger = -1L;
                private long    offsetOfLastMessageCommittedToKafka  = -1L;
-
-               private boolean isHandlingDeleteOps   = false;
+               private boolean isHandlingDeleteOps                  = false;
 
                private 
ConsumerRunnable(NotificationConsumer<EntityNotification> consumer) {
                        this.consumer = consumer;
@@ -222,10 +221,11 @@ public class AtlasTagSource extends AbstractTagSource {
                                                                                
}
 
                                                                                
atlasEntitiesWithTags.add(new RangerAtlasEntityWithTags(notificationWrapper));
+                                                                               
messages.add(message);
                                                                        } else {
                                                                                
AtlasNotificationMapper.logUnhandledEntityNotification(notificationWrapper);
+                                                                               
lastUnhandledMessage = message;
                                                                        }
-                                                                       
messages.add(message);
                                                                }
                                                        } else {
                                                                LOG.error("Null 
entityNotification received from Kafka!! Ignoring..");
@@ -235,6 +235,10 @@ public class AtlasTagSource extends AbstractTagSource {
                                                        
buildAndUploadServiceTags();
                                                }
                                        }
+                                       if (lastUnhandledMessage != null) {
+                                               
commitToKafka(lastUnhandledMessage);
+                                               lastUnhandledMessage = null;
+                                       }
 
                                } catch (Exception exception) {
                                        LOG.error("Caught exception..: ", 
exception);
@@ -255,9 +259,7 @@ public class AtlasTagSource extends AbstractTagSource {
                                LOG.debug("==> buildAndUploadServiceTags()");
                        }
 
-                       if (CollectionUtils.isNotEmpty(atlasEntitiesWithTags)) {
-
-                               commitToKafka();
+                       if (CollectionUtils.isNotEmpty(atlasEntitiesWithTags) 
&& CollectionUtils.isNotEmpty(messages)) {
 
                                Map<String, ServiceTags> serviceTagsMap = 
AtlasNotificationMapper.processAtlasEntities(atlasEntitiesWithTags);
 
@@ -284,17 +286,16 @@ public class AtlasTagSource extends AbstractTagSource {
                                        }
                                }
 
-                               offsetOfLastMessageDeliveredToRanger = 
messages.get(messages.size() - 1).getOffset();
+                               AtlasKafkaMessage<EntityNotification> 
latestMessageDeliveredToRanger       = messages.get(messages.size() - 1);
+                               commitToKafka(latestMessageDeliveredToRanger);
+
+                               atlasEntitiesWithTags.clear();
+                               messages.clear();
 
                                if (LOG.isDebugEnabled()) {
                                        LOG.debug("Completed processing batch 
of messages of size:[" + messages.size() + "] received from 
NotificationConsumer");
                                }
 
-                               commitToKafka();
-
-                               atlasEntitiesWithTags.clear();
-                               messages.clear();
-
                        }
 
                        if (LOG.isDebugEnabled()) {
@@ -302,34 +303,29 @@ public class AtlasTagSource extends AbstractTagSource {
                        }
                }
 
-               private void commitToKafka() {
+               private void 
commitToKafka(AtlasKafkaMessage<EntityNotification> messageToCommit) {
                        if (LOG.isDebugEnabled()) {
-                               LOG.debug("==> commitToKafka()");
+                               LOG.debug("==> commitToKafka(" + 
messageToCommit + ")");
                        }
 
-                       for (AtlasKafkaMessage<EntityNotification> message : 
messages) {
-                               if (message.getOffset() > 
offsetOfLastMessageCommittedToKafka) {
-                                       if (message.getOffset() <= 
offsetOfLastMessageDeliveredToRanger) {
-                                               // Already delivered to Ranger
-                                               TopicPartition partition = new 
TopicPartition("ATLAS_ENTITIES", message.getPartition());
-                                               try {
-                                                       if 
(LOG.isDebugEnabled()) {
-                                                               
LOG.debug("Committing message with offset:[" + message.getOffset() + "] to 
Kafka");
-                                                       }
-                                                       
consumer.commit(partition, message.getOffset());
-                                                       
offsetOfLastMessageCommittedToKafka = message.getOffset();
-                                               } catch (Exception 
commitException) {
-                                                       LOG.warn("Ranger 
tagsync already processed message at offset " + message.getOffset() + ". 
Ignoring failure in committing this message and continuing to process next 
message", commitException);
-                                                       LOG.warn("This will 
cause Kafka to deliver this message:[" + message.getOffset() + "] repeatedly!! 
This may be unrecoverable error!!");
-                                               }
-                                       } else {
-                                               break;
+                       long messageOffset = messageToCommit.getOffset();
+                       int  partitionId   = messageToCommit.getPartition();
+
+                       if (offsetOfLastMessageCommittedToKafka < 
messageOffset) {
+                               TopicPartition partition = new 
TopicPartition("ATLAS_ENTITIES", partitionId);
+                               try {
+                                       if (LOG.isDebugEnabled()) {
+                                               LOG.debug("Committing message 
with offset:[" + messageOffset + "] to Kafka");
                                        }
+                                       consumer.commit(partition, 
messageOffset);
+                                       offsetOfLastMessageCommittedToKafka = 
messageOffset;
+                               } catch (Exception commitException) {
+                                       LOG.warn("Ranger tagsync already 
processed message at offset " + messageOffset + ". Ignoring failure in 
committing message:[" + messageToCommit + "]", commitException);
                                }
                        }
 
                        if (LOG.isDebugEnabled()) {
-                               LOG.debug("<== commitToKafka()");
+                               LOG.debug("<== commitToKafka(" + 
messageToCommit + ")");
                        }
                }
        }

Reply via email to