This is an automated email from the ASF dual-hosted git repository. abhay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ranger.git
The following commit(s) were added to refs/heads/master by this push: new e8a6125ba RANGER-4130: Improve performance of event processing in agsync by optimizing number of commits to Kafka broker e8a6125ba is described below commit e8a6125ba99b5ca4f62923552ddb251ee476cfdd Author: Abhay Kulkarni <ab...@apache.org> AuthorDate: Tue Apr 18 18:07:32 2023 -0700 RANGER-4130: Improve performance of event processing in agsync by optimizing number of commits to Kafka broker --- .../tagsync/source/atlas/AtlasTagSource.java | 68 ++++++++++------------ 1 file changed, 32 insertions(+), 36 deletions(-) diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java index a618cc986..34a39f73c 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java @@ -52,9 +52,9 @@ public class AtlasTagSource extends AbstractTagSource { public static final String TAGSYNC_ATLAS_PROPERTIES_FILE_NAME = "atlas-application.properties"; - public static final String TAGSYNC_ATLAS_KAFKA_ENDPOINTS = "atlas.kafka.bootstrap.servers"; - public static final String TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT = "atlas.kafka.zookeeper.connect"; - public static final String TAGSYNC_ATLAS_CONSUMER_GROUP = "atlas.kafka.entities.group.id"; + public static final String TAGSYNC_ATLAS_KAFKA_ENDPOINTS = "atlas.kafka.bootstrap.servers"; + public static final String TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT = "atlas.kafka.zookeeper.connect"; + public static final String TAGSYNC_ATLAS_CONSUMER_GROUP = "atlas.kafka.entities.group.id"; public static final int MAX_WAIT_TIME_IN_MILLIS = 1000; @@ -168,11 +168,10 @@ public class AtlasTagSource extends AbstractTagSource { private final List<RangerAtlasEntityWithTags> atlasEntitiesWithTags = new ArrayList<>(); private final List<AtlasKafkaMessage<EntityNotification>> messages = new ArrayList<>(); + private AtlasKafkaMessage<EntityNotification> lastUnhandledMessage = null; - private long offsetOfLastMessageDeliveredToRanger = -1L; private long offsetOfLastMessageCommittedToKafka = -1L; - - private boolean isHandlingDeleteOps = false; + private boolean isHandlingDeleteOps = false; private ConsumerRunnable(NotificationConsumer<EntityNotification> consumer) { this.consumer = consumer; @@ -222,10 +221,11 @@ public class AtlasTagSource extends AbstractTagSource { } atlasEntitiesWithTags.add(new RangerAtlasEntityWithTags(notificationWrapper)); + messages.add(message); } else { AtlasNotificationMapper.logUnhandledEntityNotification(notificationWrapper); + lastUnhandledMessage = message; } - messages.add(message); } } else { LOG.error("Null entityNotification received from Kafka!! Ignoring.."); @@ -235,6 +235,10 @@ public class AtlasTagSource extends AbstractTagSource { buildAndUploadServiceTags(); } } + if (lastUnhandledMessage != null) { + commitToKafka(lastUnhandledMessage); + lastUnhandledMessage = null; + } } catch (Exception exception) { LOG.error("Caught exception..: ", exception); @@ -255,9 +259,7 @@ public class AtlasTagSource extends AbstractTagSource { LOG.debug("==> buildAndUploadServiceTags()"); } - if (CollectionUtils.isNotEmpty(atlasEntitiesWithTags)) { - - commitToKafka(); + if (CollectionUtils.isNotEmpty(atlasEntitiesWithTags) && CollectionUtils.isNotEmpty(messages)) { Map<String, ServiceTags> serviceTagsMap = AtlasNotificationMapper.processAtlasEntities(atlasEntitiesWithTags); @@ -284,17 +286,16 @@ public class AtlasTagSource extends AbstractTagSource { } } - offsetOfLastMessageDeliveredToRanger = messages.get(messages.size() - 1).getOffset(); + AtlasKafkaMessage<EntityNotification> latestMessageDeliveredToRanger = messages.get(messages.size() - 1); + commitToKafka(latestMessageDeliveredToRanger); + + atlasEntitiesWithTags.clear(); + messages.clear(); if (LOG.isDebugEnabled()) { LOG.debug("Completed processing batch of messages of size:[" + messages.size() + "] received from NotificationConsumer"); } - commitToKafka(); - - atlasEntitiesWithTags.clear(); - messages.clear(); - } if (LOG.isDebugEnabled()) { @@ -302,34 +303,29 @@ public class AtlasTagSource extends AbstractTagSource { } } - private void commitToKafka() { + private void commitToKafka(AtlasKafkaMessage<EntityNotification> messageToCommit) { if (LOG.isDebugEnabled()) { - LOG.debug("==> commitToKafka()"); + LOG.debug("==> commitToKafka(" + messageToCommit + ")"); } - for (AtlasKafkaMessage<EntityNotification> message : messages) { - if (message.getOffset() > offsetOfLastMessageCommittedToKafka) { - if (message.getOffset() <= offsetOfLastMessageDeliveredToRanger) { - // Already delivered to Ranger - TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition()); - try { - if (LOG.isDebugEnabled()) { - LOG.debug("Committing message with offset:[" + message.getOffset() + "] to Kafka"); - } - consumer.commit(partition, message.getOffset()); - offsetOfLastMessageCommittedToKafka = message.getOffset(); - } catch (Exception commitException) { - LOG.warn("Ranger tagsync already processed message at offset " + message.getOffset() + ". Ignoring failure in committing this message and continuing to process next message", commitException); - LOG.warn("This will cause Kafka to deliver this message:[" + message.getOffset() + "] repeatedly!! This may be unrecoverable error!!"); - } - } else { - break; + long messageOffset = messageToCommit.getOffset(); + int partitionId = messageToCommit.getPartition(); + + if (offsetOfLastMessageCommittedToKafka < messageOffset) { + TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", partitionId); + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Committing message with offset:[" + messageOffset + "] to Kafka"); } + consumer.commit(partition, messageOffset); + offsetOfLastMessageCommittedToKafka = messageOffset; + } catch (Exception commitException) { + LOG.warn("Ranger tagsync already processed message at offset " + messageOffset + ". Ignoring failure in committing message:[" + messageToCommit + "]", commitException); } } if (LOG.isDebugEnabled()) { - LOG.debug("<== commitToKafka()"); + LOG.debug("<== commitToKafka(" + messageToCommit + ")"); } } }