This is an automated email from the ASF dual-hosted git repository.

abhay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ranger.git


The following commit(s) were added to refs/heads/master by this push:
     new fe33f69ae RANGER-4129: ArrayIndexOutOfBounds exception may be thrown 
while processing events
fe33f69ae is described below

commit fe33f69ae5d4ac4f2aa9788523d0bb7313c150f2
Author: Abhay Kulkarni <ab...@apache.org>
AuthorDate: Tue Mar 14 07:59:00 2023 -0700

    RANGER-4129: ArrayIndexOutOfBounds exception may be thrown while processing 
events
---
 .../source/atlas/AtlasNotificationMapper.java      | 52 -----------------
 .../tagsync/source/atlas/AtlasTagSource.java       | 67 +++++++++++-----------
 2 files changed, 35 insertions(+), 84 deletions(-)

diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
index a7c456b3d..5d5ab8a7d 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
@@ -75,29 +75,6 @@ public class AtlasNotificationMapper {
         }
     }
 
-    public static ServiceTags 
processEntityNotification(EntityNotificationWrapper entityNotification) {
-
-        ServiceTags ret = null;
-
-        if (isNotificationHandled(entityNotification)) {
-            try {
-                RangerAtlasEntityWithTags entityWithTags = new 
RangerAtlasEntityWithTags(entityNotification);
-
-                if (entityNotification.getIsEntityDeleteOp()) {
-                    ret = 
buildServiceTagsForEntityDeleteNotification(entityWithTags);
-                } else {
-                    ret = buildServiceTags(entityWithTags, null);
-                }
-
-            } catch (Exception exception) {
-                LOG.error("createServiceTags() failed!! ", exception);
-            }
-        } else {
-            logUnhandledEntityNotification(entityNotification);
-        }
-        return ret;
-    }
-
     public static Map<String, ServiceTags> 
processAtlasEntities(List<RangerAtlasEntityWithTags> atlasEntities) {
         Map<String, ServiceTags> ret = null;
 
@@ -159,35 +136,6 @@ public class AtlasNotificationMapper {
         return ret;
     }
 
-    @SuppressWarnings("unchecked")
-    static ServiceTags 
buildServiceTagsForEntityDeleteNotification(RangerAtlasEntityWithTags 
entityWithTags) {
-        final ServiceTags ret;
-
-        RangerAtlasEntity   entity = entityWithTags.getEntity();
-        String              guid   = entity.getGuid();
-
-        if (StringUtils.isNotBlank(guid)) {
-            ret                                   = new ServiceTags();
-            RangerServiceResource serviceResource = new 
RangerServiceResource();
-            serviceResource.setGuid(guid);
-            ret.getServiceResources().add(serviceResource);
-        } else {
-            ret = buildServiceTags(entityWithTags, null);
-            if (ret != null) {
-                // tag-definitions should NOT be deleted as part of 
service-resource delete
-                ret.setTagDefinitions(MapUtils.EMPTY_MAP);
-                // Ranger deletes tags associated with deleted service-resource
-                ret.setTags(MapUtils.EMPTY_MAP);
-            }
-        }
-
-        if (ret != null) {
-            ret.setOp(ServiceTags.OP_DELETE);
-        }
-
-        return ret;
-    }
-
     static private Map<String, ServiceTags> 
buildServiceTags(List<RangerAtlasEntityWithTags> entitiesWithTags) {
 
         Map<String, ServiceTags> ret = new HashMap<>();
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
index 1a3ddecb5..a618cc986 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
@@ -87,7 +87,7 @@ public class AtlasTagSource extends AbstractTagSource {
                                        try {
                                                inputStream.close();
                                        } catch (IOException ioException) {
-                                               LOG.error("Cannot close Atlas 
application properties file, file-name:\" + 
TAGSYNC_ATLAS_PROPERTIES_FILE_NAME", ioException);
+                                               LOG.error("Cannot close Atlas 
application properties file, file-name:" + TAGSYNC_ATLAS_PROPERTIES_FILE_NAME, 
ioException);
                                        }
                                }
                        } else {
@@ -214,18 +214,17 @@ public class AtlasTagSource extends AbstractTagSource {
 
                                                                        if 
(AtlasNotificationMapper.isNotificationHandled(notificationWrapper)) {
 
-                                                                               
RangerAtlasEntityWithTags entityWithTags = new 
RangerAtlasEntityWithTags(notificationWrapper);
-
                                                                                
if ((notificationWrapper.getIsEntityDeleteOp() && !isHandlingDeleteOps) || 
(!notificationWrapper.getIsEntityDeleteOp() && isHandlingDeleteOps)) {
-                                                                               
        buildAndUploadServiceTags();
+                                                                               
        if (CollectionUtils.isNotEmpty(atlasEntitiesWithTags)) {
+                                                                               
                buildAndUploadServiceTags();
+                                                                               
        }
                                                                                
        isHandlingDeleteOps = !isHandlingDeleteOps;
                                                                                
}
 
-                                                                               
atlasEntitiesWithTags.add(entityWithTags);
+                                                                               
atlasEntitiesWithTags.add(new RangerAtlasEntityWithTags(notificationWrapper));
                                                                        } else {
                                                                                
AtlasNotificationMapper.logUnhandledEntityNotification(notificationWrapper);
                                                                        }
-
                                                                        
messages.add(message);
                                                                }
                                                        } else {
@@ -256,43 +255,47 @@ public class AtlasTagSource extends AbstractTagSource {
                                LOG.debug("==> buildAndUploadServiceTags()");
                        }
 
-                       commitToKafka();
+                       if (CollectionUtils.isNotEmpty(atlasEntitiesWithTags)) {
 
-                       Map<String, ServiceTags> serviceTagsMap = 
AtlasNotificationMapper.processAtlasEntities(atlasEntitiesWithTags);
+                               commitToKafka();
 
-                       if (MapUtils.isNotEmpty(serviceTagsMap)) {
-                               if (serviceTagsMap.size() != 1) {
-                                       LOG.warn("Unexpected!! Notifications 
for more than one service received by AtlasTagSource.. Service-Names:[" + 
serviceTagsMap.keySet() + "]");
-                               }
-                               for (Map.Entry<String, ServiceTags> entry : 
serviceTagsMap.entrySet()) {
-                                       if (isHandlingDeleteOps) {
-                                               
entry.getValue().setOp(ServiceTags.OP_DELETE);
-                                               
entry.getValue().setTagDefinitions(Collections.EMPTY_MAP);
-                                               
entry.getValue().setTags(Collections.EMPTY_MAP);
-                                       } else {
-                                               
entry.getValue().setOp(ServiceTags.OP_ADD_OR_UPDATE);
+                               Map<String, ServiceTags> serviceTagsMap = 
AtlasNotificationMapper.processAtlasEntities(atlasEntitiesWithTags);
+
+                               if (MapUtils.isNotEmpty(serviceTagsMap)) {
+                                       if (serviceTagsMap.size() != 1) {
+                                               LOG.warn("Unexpected!! 
Notifications for more than one service received by AtlasTagSource.. 
Service-Names:[" + serviceTagsMap.keySet() + "]");
                                        }
+                                       for (Map.Entry<String, ServiceTags> 
entry : serviceTagsMap.entrySet()) {
+                                               if (isHandlingDeleteOps) {
+                                                       
entry.getValue().setOp(ServiceTags.OP_DELETE);
+                                                       
entry.getValue().setTagDefinitions(Collections.EMPTY_MAP);
+                                                       
entry.getValue().setTags(Collections.EMPTY_MAP);
+                                               } else {
+                                                       
entry.getValue().setOp(ServiceTags.OP_ADD_OR_UPDATE);
+                                               }
 
-                                       if (LOG.isDebugEnabled()) {
-                                               Gson gsonBuilder = new 
GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z").setPrettyPrinting().create();
-                                               String serviceTagsString = 
gsonBuilder.toJson(entry.getValue());
+                                               if (LOG.isDebugEnabled()) {
+                                                       Gson gsonBuilder = new 
GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z").setPrettyPrinting().create();
+                                                       String 
serviceTagsString = gsonBuilder.toJson(entry.getValue());
 
-                                               LOG.debug("serviceTags=" + 
serviceTagsString);
+                                                       
LOG.debug("serviceTags=" + serviceTagsString);
+                                               }
+                                               updateSink(entry.getValue());
                                        }
-                                       updateSink(entry.getValue());
                                }
-                       }
 
-                       offsetOfLastMessageDeliveredToRanger = 
messages.get(messages.size()-1).getOffset();
+                               offsetOfLastMessageDeliveredToRanger = 
messages.get(messages.size() - 1).getOffset();
 
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("Completed processing batch of 
messages of size:[" + messages.size() + "] received from NotificationConsumer");
-                       }
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug("Completed processing batch 
of messages of size:[" + messages.size() + "] received from 
NotificationConsumer");
+                               }
+
+                               commitToKafka();
 
-                       commitToKafka();
+                               atlasEntitiesWithTags.clear();
+                               messages.clear();
 
-                       atlasEntitiesWithTags.clear();
-                       messages.clear();
+                       }
 
                        if (LOG.isDebugEnabled()) {
                                LOG.debug("<== buildAndUploadServiceTags()");

Reply via email to