This is an automated email from the ASF dual-hosted git repository. abhay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ranger.git
The following commit(s) were added to refs/heads/master by this push: new fe33f69ae RANGER-4129: ArrayIndexOutOfBounds exception may be thrown while processing events fe33f69ae is described below commit fe33f69ae5d4ac4f2aa9788523d0bb7313c150f2 Author: Abhay Kulkarni <ab...@apache.org> AuthorDate: Tue Mar 14 07:59:00 2023 -0700 RANGER-4129: ArrayIndexOutOfBounds exception may be thrown while processing events --- .../source/atlas/AtlasNotificationMapper.java | 52 ----------------- .../tagsync/source/atlas/AtlasTagSource.java | 67 +++++++++++----------- 2 files changed, 35 insertions(+), 84 deletions(-) diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java index a7c456b3d..5d5ab8a7d 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java @@ -75,29 +75,6 @@ public class AtlasNotificationMapper { } } - public static ServiceTags processEntityNotification(EntityNotificationWrapper entityNotification) { - - ServiceTags ret = null; - - if (isNotificationHandled(entityNotification)) { - try { - RangerAtlasEntityWithTags entityWithTags = new RangerAtlasEntityWithTags(entityNotification); - - if (entityNotification.getIsEntityDeleteOp()) { - ret = buildServiceTagsForEntityDeleteNotification(entityWithTags); - } else { - ret = buildServiceTags(entityWithTags, null); - } - - } catch (Exception exception) { - LOG.error("createServiceTags() failed!! ", exception); - } - } else { - logUnhandledEntityNotification(entityNotification); - } - return ret; - } - public static Map<String, ServiceTags> processAtlasEntities(List<RangerAtlasEntityWithTags> atlasEntities) { Map<String, ServiceTags> ret = null; @@ -159,35 +136,6 @@ public class AtlasNotificationMapper { return ret; } - @SuppressWarnings("unchecked") - static ServiceTags buildServiceTagsForEntityDeleteNotification(RangerAtlasEntityWithTags entityWithTags) { - final ServiceTags ret; - - RangerAtlasEntity entity = entityWithTags.getEntity(); - String guid = entity.getGuid(); - - if (StringUtils.isNotBlank(guid)) { - ret = new ServiceTags(); - RangerServiceResource serviceResource = new RangerServiceResource(); - serviceResource.setGuid(guid); - ret.getServiceResources().add(serviceResource); - } else { - ret = buildServiceTags(entityWithTags, null); - if (ret != null) { - // tag-definitions should NOT be deleted as part of service-resource delete - ret.setTagDefinitions(MapUtils.EMPTY_MAP); - // Ranger deletes tags associated with deleted service-resource - ret.setTags(MapUtils.EMPTY_MAP); - } - } - - if (ret != null) { - ret.setOp(ServiceTags.OP_DELETE); - } - - return ret; - } - static private Map<String, ServiceTags> buildServiceTags(List<RangerAtlasEntityWithTags> entitiesWithTags) { Map<String, ServiceTags> ret = new HashMap<>(); diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java index 1a3ddecb5..a618cc986 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java @@ -87,7 +87,7 @@ public class AtlasTagSource extends AbstractTagSource { try { inputStream.close(); } catch (IOException ioException) { - LOG.error("Cannot close Atlas application properties file, file-name:\" + TAGSYNC_ATLAS_PROPERTIES_FILE_NAME", ioException); + LOG.error("Cannot close Atlas application properties file, file-name:" + TAGSYNC_ATLAS_PROPERTIES_FILE_NAME, ioException); } } } else { @@ -214,18 +214,17 @@ public class AtlasTagSource extends AbstractTagSource { if (AtlasNotificationMapper.isNotificationHandled(notificationWrapper)) { - RangerAtlasEntityWithTags entityWithTags = new RangerAtlasEntityWithTags(notificationWrapper); - if ((notificationWrapper.getIsEntityDeleteOp() && !isHandlingDeleteOps) || (!notificationWrapper.getIsEntityDeleteOp() && isHandlingDeleteOps)) { - buildAndUploadServiceTags(); + if (CollectionUtils.isNotEmpty(atlasEntitiesWithTags)) { + buildAndUploadServiceTags(); + } isHandlingDeleteOps = !isHandlingDeleteOps; } - atlasEntitiesWithTags.add(entityWithTags); + atlasEntitiesWithTags.add(new RangerAtlasEntityWithTags(notificationWrapper)); } else { AtlasNotificationMapper.logUnhandledEntityNotification(notificationWrapper); } - messages.add(message); } } else { @@ -256,43 +255,47 @@ public class AtlasTagSource extends AbstractTagSource { LOG.debug("==> buildAndUploadServiceTags()"); } - commitToKafka(); + if (CollectionUtils.isNotEmpty(atlasEntitiesWithTags)) { - Map<String, ServiceTags> serviceTagsMap = AtlasNotificationMapper.processAtlasEntities(atlasEntitiesWithTags); + commitToKafka(); - if (MapUtils.isNotEmpty(serviceTagsMap)) { - if (serviceTagsMap.size() != 1) { - LOG.warn("Unexpected!! Notifications for more than one service received by AtlasTagSource.. Service-Names:[" + serviceTagsMap.keySet() + "]"); - } - for (Map.Entry<String, ServiceTags> entry : serviceTagsMap.entrySet()) { - if (isHandlingDeleteOps) { - entry.getValue().setOp(ServiceTags.OP_DELETE); - entry.getValue().setTagDefinitions(Collections.EMPTY_MAP); - entry.getValue().setTags(Collections.EMPTY_MAP); - } else { - entry.getValue().setOp(ServiceTags.OP_ADD_OR_UPDATE); + Map<String, ServiceTags> serviceTagsMap = AtlasNotificationMapper.processAtlasEntities(atlasEntitiesWithTags); + + if (MapUtils.isNotEmpty(serviceTagsMap)) { + if (serviceTagsMap.size() != 1) { + LOG.warn("Unexpected!! Notifications for more than one service received by AtlasTagSource.. Service-Names:[" + serviceTagsMap.keySet() + "]"); } + for (Map.Entry<String, ServiceTags> entry : serviceTagsMap.entrySet()) { + if (isHandlingDeleteOps) { + entry.getValue().setOp(ServiceTags.OP_DELETE); + entry.getValue().setTagDefinitions(Collections.EMPTY_MAP); + entry.getValue().setTags(Collections.EMPTY_MAP); + } else { + entry.getValue().setOp(ServiceTags.OP_ADD_OR_UPDATE); + } - if (LOG.isDebugEnabled()) { - Gson gsonBuilder = new GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z").setPrettyPrinting().create(); - String serviceTagsString = gsonBuilder.toJson(entry.getValue()); + if (LOG.isDebugEnabled()) { + Gson gsonBuilder = new GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z").setPrettyPrinting().create(); + String serviceTagsString = gsonBuilder.toJson(entry.getValue()); - LOG.debug("serviceTags=" + serviceTagsString); + LOG.debug("serviceTags=" + serviceTagsString); + } + updateSink(entry.getValue()); } - updateSink(entry.getValue()); } - } - offsetOfLastMessageDeliveredToRanger = messages.get(messages.size()-1).getOffset(); + offsetOfLastMessageDeliveredToRanger = messages.get(messages.size() - 1).getOffset(); - if (LOG.isDebugEnabled()) { - LOG.debug("Completed processing batch of messages of size:[" + messages.size() + "] received from NotificationConsumer"); - } + if (LOG.isDebugEnabled()) { + LOG.debug("Completed processing batch of messages of size:[" + messages.size() + "] received from NotificationConsumer"); + } + + commitToKafka(); - commitToKafka(); + atlasEntitiesWithTags.clear(); + messages.clear(); - atlasEntitiesWithTags.clear(); - messages.clear(); + } if (LOG.isDebugEnabled()) { LOG.debug("<== buildAndUploadServiceTags()");