This is an automated email from the ASF dual-hosted git repository.

madhan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new 3841a1b  ATLAS-3054: updated notification processing to support 
batch-commits
3841a1b is described below

commit 3841a1bfd41b485f3cdeaad05abba2b14289c9d6
Author: Madhan Neethiraj <mad...@apache.org>
AuthorDate: Thu Feb 21 07:31:56 2019 -0800

    ATLAS-3054: updated notification processing to support batch-commits
---
 .../store/graph/v2/AtlasEntityStream.java          | 18 +++----
 .../notification/NotificationHookConsumer.java     | 55 +++++++++++++++-------
 .../preprocessor/PreprocessorContext.java          | 29 ++++++++++++
 3 files changed, 77 insertions(+), 25 deletions(-)

diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStream.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStream.java
index 75a7e61..d12b036 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStream.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStream.java
@@ -30,27 +30,27 @@ public class AtlasEntityStream implements EntityStream {
 
 
     public AtlasEntityStream(AtlasEntity entity) {
-        this(new AtlasEntitiesWithExtInfo(entity));
+        this(new AtlasEntitiesWithExtInfo(entity), null);
     }
 
     public AtlasEntityStream(AtlasEntityWithExtInfo entityWithExtInfo) {
-        this(new AtlasEntitiesWithExtInfo(entityWithExtInfo));
+        this(new AtlasEntitiesWithExtInfo(entityWithExtInfo), null);
     }
 
     public AtlasEntityStream(AtlasEntitiesWithExtInfo entitiesWithExtInfo) {
-        this.entitiesWithExtInfo = entitiesWithExtInfo;
-        this.iterator            = 
this.entitiesWithExtInfo.getEntities().iterator();
-        this.entityStream        = null;
+        this(entitiesWithExtInfo, null);
     }
 
     public AtlasEntityStream(AtlasEntity entity, EntityStream entityStream) {
-        this.entitiesWithExtInfo = new AtlasEntitiesWithExtInfo(entity);
-        this.iterator            = 
this.entitiesWithExtInfo.getEntities().iterator();
-        this.entityStream        = entityStream;
+        this(new AtlasEntitiesWithExtInfo(entity), entityStream);
     }
 
     public AtlasEntityStream(AtlasEntityWithExtInfo entityWithExtInfo, 
EntityStream entityStream) {
-        this.entitiesWithExtInfo = new 
AtlasEntitiesWithExtInfo(entityWithExtInfo);
+        this(new AtlasEntitiesWithExtInfo(entityWithExtInfo), entityStream);
+    }
+
+    public AtlasEntityStream(AtlasEntitiesWithExtInfo entitiesWithExtInfo, 
EntityStream entityStream) {
+        this.entitiesWithExtInfo = entitiesWithExtInfo;
         this.iterator            = 
this.entitiesWithExtInfo.getEntities().iterator();
         this.entityStream        = entityStream;
     }
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index f829260..6d03ba4 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -27,6 +27,7 @@ import org.apache.atlas.AtlasConfiguration;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.AtlasServiceException;
 import org.apache.atlas.RequestContext;
+import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.ha.HAConfiguration;
 import org.apache.atlas.kafka.AtlasKafkaMessage;
 import org.apache.atlas.listener.ActiveStateChangeHandler;
@@ -113,8 +114,10 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
     public static final String CONSUMER_RETRY_INTERVAL           = 
"atlas.notification.consumer.retry.interval";
     public static final String CONSUMER_MIN_RETRY_INTERVAL       = 
"atlas.notification.consumer.min.retry.interval";
     public static final String CONSUMER_MAX_RETRY_INTERVAL       = 
"atlas.notification.consumer.max.retry.interval";
+    public static final String CONSUMER_COMMIT_BATCH_SIZE        = 
"atlas.notification.consumer.commit.batch.size";
     public static final String CONSUMER_DISABLED                 = 
"atlas.notification.consumer.disabled";
 
+
     public static final String CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633    
              = 
"atlas.notification.consumer.skip.hive_column_lineage.hive-20633";
     public static final String 
CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD = 
"atlas.notification.consumer.skip.hive_column_lineage.hive-20633.inputs.threshold";
     public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN   
              = 
"atlas.notification.consumer.preprocess.hive_table.ignore.pattern";
@@ -131,6 +134,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
     private final int                           failedMsgCacheSize;
     private final int                           minWaitDuration;
     private final int                           maxWaitDuration;
+    private final int                           commitBatchSize;
     private final boolean                       skipHiveColumnLineageHive20633;
     private final int                           
skipHiveColumnLineageHive20633InputsThreshold;
     private final int                           
largeMessageProcessingTimeThresholdMs;
@@ -166,10 +170,11 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
         consumerRetryInterval = 
applicationProperties.getInt(CONSUMER_RETRY_INTERVAL, 500);
         minWaitDuration       = 
applicationProperties.getInt(CONSUMER_MIN_RETRY_INTERVAL, 
consumerRetryInterval); // 500 ms  by default
         maxWaitDuration       = 
applicationProperties.getInt(CONSUMER_MAX_RETRY_INTERVAL, minWaitDuration * 
60);  //  30 sec by default
+        commitBatchSize       = 
applicationProperties.getInt(CONSUMER_COMMIT_BATCH_SIZE, 50);
 
         skipHiveColumnLineageHive20633                = 
applicationProperties.getBoolean(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, 
false);
         skipHiveColumnLineageHive20633InputsThreshold = 
applicationProperties.getInt(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD,
 15); // skip if avg # of inputs is > 15
-        consumerDisabled                                                       
  = applicationProperties.getBoolean(CONSUMER_DISABLED, false);
+        consumerDisabled                              = 
applicationProperties.getBoolean(CONSUMER_DISABLED, false);
         largeMessageProcessingTimeThresholdMs         = 
applicationProperties.getInt("atlas.notification.consumer.large.message.processing.time.threshold.ms",
 60 * 1000);  //  60 sec by default
 
         String[] patternHiveTablesToIgnore = 
applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN);
@@ -500,7 +505,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
                                                             
AtlasClient.API_V1.CREATE_ENTITY.getNormalizedPath());
                                 }
 
-                                atlasEntityStore.createOrUpdate(new 
AtlasEntityStream(entities), false);
+                                createOrUpdate(entities, false);
                             }
                             break;
 
@@ -521,7 +526,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
                                 // There should only be one root entity
                                 entities.getEntities().get(0).setGuid(guid);
 
-                                atlasEntityStore.createOrUpdate(new 
AtlasEntityStream(entities), true);
+                                createOrUpdate(entities, true);
                             }
                             break;
 
@@ -554,7 +559,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
                                                             
AtlasClientV2.API_V2.UPDATE_ENTITY.getNormalizedPath());
                                 }
 
-                                atlasEntityStore.createOrUpdate(new 
AtlasEntityStream(entities), false);
+                                createOrUpdate(entities, false);
                             }
                             break;
 
@@ -568,7 +573,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
                                                             
AtlasClientV2.API_V2.CREATE_ENTITY.getNormalizedPath());
                                 }
 
-                                atlasEntityStore.createOrUpdate(new 
AtlasEntityStream(entities), false);
+                                createOrUpdate(entities, false);
                             }
                             break;
 
@@ -597,7 +602,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
                                                             
AtlasClientV2.API_V2.UPDATE_ENTITY.getNormalizedPath());
                                 }
 
-                                atlasEntityStore.createOrUpdate(new 
AtlasEntityStream(entities), false);
+                                createOrUpdate(entities, false);
                             }
                             break;
 
@@ -682,6 +687,32 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
             }
         }
 
+        private void createOrUpdate(AtlasEntitiesWithExtInfo entities, boolean 
isPartialUpdate) throws AtlasBaseException {
+            List<AtlasEntity> entitiesList = entities.getEntities();
+            AtlasEntityStream entityStream = new AtlasEntityStream(entities);
+
+            if (entitiesList.size() <= commitBatchSize) {
+                atlasEntityStore.createOrUpdate(entityStream, isPartialUpdate);
+            } else {
+                for (int fromIdx = 0; fromIdx < entitiesList.size(); fromIdx 
+= commitBatchSize) {
+                    int toIndex = fromIdx + commitBatchSize;
+
+                    if (toIndex > entitiesList.size()) {
+                        toIndex = entitiesList.size();
+                    }
+
+                    AtlasEntitiesWithExtInfo batch       = new 
AtlasEntitiesWithExtInfo(new ArrayList<>(entitiesList.subList(fromIdx, 
toIndex)));
+                    AtlasEntityStream        batchStream = new 
AtlasEntityStream(batch, entityStream);
+
+                    atlasEntityStore.createOrUpdate(batchStream, 
isPartialUpdate);
+
+                    RequestContext.get().resetEntityGuidUpdates();
+
+                    RequestContext.get().clearCache();
+                }
+            }
+        }
+
         private void recordFailedMessages() {
             //logging failed messages
             for (String message : failedMessages) {
@@ -767,6 +798,8 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
         if (skipHiveColumnLineageHive20633) {
             skipHiveColumnLineage(context);
         }
+
+        context.moveRegisteredReferredEntities();
     }
 
     private void ignoreOrPruneHiveTables(PreprocessorContext context) {
@@ -801,16 +834,6 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
                         }
                     }
                 }
-
-                for (String guid : context.getReferredEntitiesToMove()) {
-                    AtlasEntity entity = referredEntities.remove(guid);
-
-                    if (entity != null) {
-                        entities.add(entity);
-
-                        LOG.info("moved referred entity: typeName={}, 
qualifiedName={}. topic-offset={}, partition={}", entity.getTypeName(), 
EntityPreprocessor.getQualifiedName(entity), context.getKafkaMessageOffset(), 
context.getKafkaPartition());
-                    }
-                }
             }
 
             int ignoredEntities = context.getIgnoredEntities().size();
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
index 7b4229b..2d2c09a 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
@@ -173,6 +173,35 @@ public class PreprocessorContext {
         collectGuids(obj, prunedEntities);
     }
 
+    public void moveRegisteredReferredEntities() {
+        List<AtlasEntity>        entities         = getEntities();
+        Map<String, AtlasEntity> referredEntities = getReferredEntities();
+
+        if (entities != null && referredEntities != null && 
!referredEntitiesToMove.isEmpty()) {
+            AtlasEntity firstEntity = entities.isEmpty() ? null : 
entities.get(0);
+
+            for (String guid : referredEntitiesToMove) {
+                AtlasEntity entity = referredEntities.remove(guid);
+
+                if (entity != null) {
+                    entities.add(entity);
+
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("moved referred entity: typeName={}, 
qualifiedName={}. topic-offset={}, partition={}", entity.getTypeName(), 
EntityPreprocessor.getQualifiedName(entity), kafkaMessage.getOffset(), 
kafkaMessage.getPartition());
+                    }
+                }
+            }
+
+            if (firstEntity != null) {
+                LOG.info("moved {} referred-entities to end of entities-list 
(firstEntity:typeName={}, qualifiedName={}). topic-offset={}, partition={}", 
referredEntitiesToMove.size(), firstEntity.getTypeName(), 
EntityPreprocessor.getQualifiedName(firstEntity), kafkaMessage.getOffset(), 
kafkaMessage.getPartition());
+            } else {
+                LOG.info("moved {} referred-entities to entities-list. 
topic-offset={}, partition={}", referredEntitiesToMove.size(), 
kafkaMessage.getOffset(), kafkaMessage.getPartition());
+            }
+
+            referredEntitiesToMove.clear();
+        }
+    }
+
     public String getGuid(Object obj) {
         Object ret = null;
 

Reply via email to