This is an automated email from the ASF dual-hosted git repository.

radhikakundam pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new f9dec5375 ATLAS-4335: Hook Notifications through Rest Interface
f9dec5375 is described below

commit f9dec5375499c8ced48c3885b690bab36a172419
Author: radhikakundam <radhikakun...@apache.org>
AuthorDate: Tue Dec 6 11:23:22 2022 -0800

    ATLAS-4335: Hook Notifications through Rest Interface
    
    Signed-off-by: radhikakundam <radhikakun...@apache.org>
    (cherry picked from commit 8d8169e83ac4acfb7b152594eac96ddbc9bb8437)
---
 addons/falcon-bridge/pom.xml                       |   5 -
 addons/hbase-bridge/pom.xml                        |   8 +-
 addons/hive-bridge/pom.xml                         |   5 +
 addons/impala-bridge/pom.xml                       |   5 -
 addons/sqoop-bridge/pom.xml                        |   5 -
 .../org/apache/atlas/authorize/AtlasPrivilege.java |   4 +-
 .../main/java/org/apache/atlas/AtlasClientV2.java  |  15 +-
 .../java/org/apache/atlas/AtlasConfiguration.java  |   5 +
 .../main/java/org/apache/atlas/AtlasErrorCode.java |   4 +-
 notification/pom.xml                               |   4 +-
 .../main/java/org/apache/atlas/hook/AtlasHook.java |   6 +
 .../org/apache/atlas/kafka/AtlasKafkaConsumer.java |  26 +++-
 .../org/apache/atlas/kafka/AtlasKafkaMessage.java  |  18 ++-
 .../org/apache/atlas/kafka/KafkaNotification.java  |  77 ++++++++++-
 .../apache/atlas/kafka/NotificationProvider.java   |  31 +++--
 .../AtlasNotificationMessageDeserializer.java      |   7 +
 .../atlas/notification/NotificationConsumer.java   |   7 +
 .../atlas/notification/NotificationInterface.java  |   3 +
 .../atlas/notification/rest/RestNotification.java  | 153 +++++++++++++++++++++
 .../AbstractNotificationConsumerTest.java          |   5 +
 .../atlas/notification/RestNotificationTest.java   | 136 ++++++++++++++++++
 .../notification/NotificationHookConsumer.java     | 117 ++++++++++++++--
 .../apache/atlas/web/rest/NotificationREST.java    | 121 ++++++++++++++++
 .../atlas/web/integration/NotificationRestIT.java  |  73 ++++++++++
 .../json/notifications/create-db-ddl.json          |   1 +
 .../resources/json/notifications/create-db.json    |   1 +
 .../resources/json/notifications/delete-db.json    |   1 +
 27 files changed, 789 insertions(+), 54 deletions(-)

diff --git a/addons/falcon-bridge/pom.xml b/addons/falcon-bridge/pom.xml
index 81b0db332..18cd1fef3 100644
--- a/addons/falcon-bridge/pom.xml
+++ b/addons/falcon-bridge/pom.xml
@@ -185,11 +185,6 @@
                                             
<artifactId>jersey-json</artifactId>
                                             
<version>${jersey.version}</version>
                                         </artifactItem>
-                                        <artifactItem>
-                                            <groupId>javax.ws.rs</groupId>
-                                            <artifactId>jsr311-api</artifactId>
-                                            <version>${jsr.version}</version>
-                                        </artifactItem>
                                     </artifactItems>
                                 </configuration>
                             </execution>
diff --git a/addons/hbase-bridge/pom.xml b/addons/hbase-bridge/pom.xml
index aac1bea57..aafe86936 100644
--- a/addons/hbase-bridge/pom.xml
+++ b/addons/hbase-bridge/pom.xml
@@ -63,7 +63,7 @@
         <dependency>
             <groupId>com.sun.jersey</groupId>
             <artifactId>jersey-bundle</artifactId>
-            <version>1.19</version>
+            <version>${jersey.version}</version>
             <scope>test</scope>
         </dependency>
 
@@ -399,11 +399,6 @@
                                             
<artifactId>jersey-bundle</artifactId>
                                             
<version>${jersey.version}</version>
                                         </artifactItem>
-                                        <artifactItem>
-                                            <groupId>javax.ws.rs</groupId>
-                                            <artifactId>jsr311-api</artifactId>
-                                            <version>${jsr.version}</version>
-                                        </artifactItem>
                                     </artifactItems>
                                 </configuration>
                             </execution>
@@ -621,7 +616,6 @@
                     </execution>
                 </executions>
             </plugin>
-
         </plugins>
     </build>
 </project>
diff --git a/addons/hive-bridge/pom.xml b/addons/hive-bridge/pom.xml
index 018bd8709..70352d45c 100755
--- a/addons/hive-bridge/pom.xml
+++ b/addons/hive-bridge/pom.xml
@@ -311,6 +311,11 @@
                                             
<artifactId>jersey-json</artifactId>
                                             
<version>${jersey.version}</version>
                                         </artifactItem>
+                                        <artifactItem>
+                                            <groupId>com.sun.jersey</groupId>
+                                            
<artifactId>jersey-client</artifactId>
+                                            
<version>${jersey.version}</version>
+                                        </artifactItem>
                                     </artifactItems>
                                 </configuration>
                             </execution>
diff --git a/addons/impala-bridge/pom.xml b/addons/impala-bridge/pom.xml
index 88bb73852..26303c308 100644
--- a/addons/impala-bridge/pom.xml
+++ b/addons/impala-bridge/pom.xml
@@ -331,11 +331,6 @@
                       <artifactId>jersey-json</artifactId>
                       <version>${jersey.version}</version>
                     </artifactItem>
-                    <artifactItem>
-                      <groupId>javax.ws.rs</groupId>
-                      <artifactId>jsr311-api</artifactId>
-                      <version>${jsr.version}</version>
-                    </artifactItem>
                   </artifactItems>
                 </configuration>
               </execution>
diff --git a/addons/sqoop-bridge/pom.xml b/addons/sqoop-bridge/pom.xml
index ac06f9460..319bedc08 100644
--- a/addons/sqoop-bridge/pom.xml
+++ b/addons/sqoop-bridge/pom.xml
@@ -249,11 +249,6 @@
                                             
<artifactId>jersey-json</artifactId>
                                             
<version>${jersey.version}</version>
                                         </artifactItem>
-                                        <artifactItem>
-                                            <groupId>javax.ws.rs</groupId>
-                                            <artifactId>jsr311-api</artifactId>
-                                            <version>${jsr.version}</version>
-                                        </artifactItem>
                                     </artifactItems>
                                 </configuration>
                             </execution>
diff --git 
a/authorization/src/main/java/org/apache/atlas/authorize/AtlasPrivilege.java 
b/authorization/src/main/java/org/apache/atlas/authorize/AtlasPrivilege.java
index 5d06e1b29..f270844c5 100644
--- a/authorization/src/main/java/org/apache/atlas/authorize/AtlasPrivilege.java
+++ b/authorization/src/main/java/org/apache/atlas/authorize/AtlasPrivilege.java
@@ -46,7 +46,9 @@ public enum AtlasPrivilege {
 
      TYPE_READ("type-read"),
 
-     ADMIN_AUDITS("admin-audits");
+     ADMIN_AUDITS("admin-audits"),
+
+     SERVICE_NOTIFICATION_POST("service-notification-post");
 
      private final String type;
 
diff --git a/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java 
b/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
index 6910b0e42..4970baaa9 100644
--- a/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
+++ b/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
@@ -134,6 +134,9 @@ public class AtlasClientV2 extends AtlasBaseClient {
     private static final String GLOSSARY_CATEGORY    = GLOSSARY_URI + 
"/category";
     private static final String GLOSSARY_CATEGORIES  = GLOSSARY_URI + 
"/categories";
 
+    //Notification APIs
+    private static final String NOTIFICATION_URI         = BASE_URI + 
"v2/notification";
+
 
     public AtlasClientV2(String[] baseUrl, String[] basicAuthUserNamePassword) 
{
         super(baseUrl, basicAuthUserNamePassword);
@@ -173,7 +176,7 @@ public class AtlasClientV2 extends AtlasBaseClient {
     }
 
     @VisibleForTesting
-    AtlasClientV2(WebResource service, Configuration configuration) {
+    public AtlasClientV2(WebResource service, Configuration configuration) {
         super(service, configuration);
     }
 
@@ -1024,6 +1027,14 @@ public class AtlasClientV2 extends AtlasBaseClient {
         return callAPI(API_V2.IMPORT_GLOSSARY, BulkImportResponse.class, 
multipartEntity);
     }
 
+    public void postNotificationToTopic(String topic, List<String> messages) 
throws AtlasServiceException {
+        callAPI(formatPathParameters(API_V2.POST_NOTIFICATIONS_TO_TOPIC, 
topic), (Class<?>) null, messages);
+    }
+
+    @VisibleForTesting
+    public API formatPathWithParameter(API api, String... params) {
+        return formatPathParameters(api, params);
+    }
 
     @Override
     protected API formatPathParameters(API api, String... params) {
@@ -1199,6 +1210,8 @@ public class AtlasClientV2 extends AtlasBaseClient {
         public static final API_V2 GET_BUSINESS_METADATA_TEMPLATE    = new 
API_V2(ENTITY_API + "businessmetadata/import/template", HttpMethod.GET, 
Response.Status.OK, MediaType.APPLICATION_JSON, 
MediaType.APPLICATION_OCTET_STREAM);
         public static final API_V2 IMPORT_BUSINESS_METADATA          = new 
API_V2(ENTITY_API + "businessmetadata/import", HttpMethod.POST, 
Response.Status.OK, MediaType.MULTIPART_FORM_DATA, MediaType.APPLICATION_JSON);
 
+        public static final API_V2 POST_NOTIFICATIONS_TO_TOPIC     = new 
API_V2(NOTIFICATION_URI + "/topic/%s", HttpMethod.POST, 
Response.Status.NO_CONTENT);
+
         // labels APIs
         public static final API_V2 ADD_LABELS                        = new 
API_V2(ENTITY_API + "guid/%s/labels", HttpMethod.PUT, 
Response.Status.NO_CONTENT);
         public static final API_V2 ADD_LABELS_BY_UNIQUE_ATTRIBUTE    = new 
API_V2(ENTITY_API + "uniqueAttribute/type/%s/labels", HttpMethod.PUT, 
Response.Status.NO_CONTENT);
diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java 
b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
index e8c7a15ea..df886753f 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
@@ -40,9 +40,14 @@ public enum AtlasConfiguration {
     NOTIFICATION_HOOK_TOPIC_NAME("atlas.notification.hook.topic.name", 
"ATLAS_HOOK"),
     NOTIFICATION_ENTITIES_TOPIC_NAME("atlas.notification.entities.topic.name", 
"ATLAS_ENTITIES"),
 
+    
NOTIFICATION_HOOK_CONSUMER_BUFFERING_INTERVAL("atlas.notification.consumer.message.buffering.interval.seconds",
 15),
+    
NOTIFICATION_HOOK_CONSUMER_BUFFERING_BATCH_SIZE("atlas.notification.consumer.message.buffering.batch.size",
 100),
+
+    NOTIFICATION_HOOK_REST_ENABLED("atlas.hook.rest.notification.enabled", 
false),
     
NOTIFICATION_HOOK_CONSUMER_TOPIC_NAMES("atlas.notification.hook.consumer.topic.names",
 "ATLAS_HOOK"), //  a comma separated list of topic names
     
NOTIFICATION_ENTITIES_CONSUMER_TOPIC_NAMES("atlas.notification.entities.consumer.topic.names",
 "ATLAS_ENTITIES"), //  a comma separated list of topic names
 
+    
NOTIFICATION_REST_BODY_MAX_LENGTH_BYTES("atlas.notification.rest.body.max.length.bytes",
 (1 * 1024 * 1024)),
     
NOTIFICATION_MESSAGE_MAX_LENGTH_BYTES("atlas.notification.message.max.length.bytes",
 (1000 * 1000)),
     
NOTIFICATION_MESSAGE_COMPRESSION_ENABLED("atlas.notification.message.compression.enabled",
 true),
     
NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS("atlas.notification.split.message.segments.wait.time.seconds",
 15 * 60),
diff --git a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java 
b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
index 21ac7f78e..77a6fd8c3 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
@@ -176,6 +176,7 @@ public enum AtlasErrorCode {
     ATTRIBUTE_NAME_ALREADY_EXISTS_IN_ANOTHER_PARENT_TYPE(400, 
"ATLAS-400-00-09E", "Invalid attribute name: {0}.{1}. Attribute already exists 
in another parent type: {2}"),
     IMPORT_INVALID_ZIP_ENTRY(400, "ATLAS-400-00-09F", "{0}: invalid zip entry. 
Reason: {1}"),
     LINEAGE_ON_DEMAND_NOT_ENABLED(400, "ATLAS-400-00-100", "Lineage on demand 
config: {0} is not enabled"),
+    INVALID_TOPIC_NAME(400, "ATLAS-400-00-101", "Unsupported topic name : 
{0}"),
 
     UNAUTHORIZED_ACCESS(403, "ATLAS-403-00-001", "{0} is not authorized to 
perform {1}"),
 
@@ -243,7 +244,8 @@ public enum AtlasErrorCode {
     ENTITY_NOTIFICATION_FAILED(500, "ATLAS-500-00-014", "Notification failed 
for operation: {0} : {1}"),
     FAILED_TO_UPLOAD(500, "ATLAS-500-00-015", "Error occurred while uploading 
the file: {0}"),
     FAILED_TO_CREATE_GLOSSARY_TERM(500, "ATLAS-500-00-016", "Error occurred 
while creating glossary term: {0}"),
-    FAILED_TO_UPDATE_GLOSSARY_TERM(500, "ATLAS-500-00-017", "Error occurred 
while updating glossary term: {0}");
+    FAILED_TO_UPDATE_GLOSSARY_TERM(500, "ATLAS-500-00-017", "Error occurred 
while updating glossary term: {0}"),
+    NOTIFICATION_EXCEPTION(500, "ATLAS-500-00-018", "{0}");
 
     private String errorCode;
     private String errorMessage;
diff --git a/notification/pom.xml b/notification/pom.xml
index 6b4a03ef7..d0c596d57 100644
--- a/notification/pom.xml
+++ b/notification/pom.xml
@@ -42,7 +42,9 @@
 
         <dependency>
             <groupId>org.apache.atlas</groupId>
-            <artifactId>atlas-server-api</artifactId>
+            <artifactId>atlas-client-v2</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
         </dependency>
 
         <dependency>
diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java 
b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
index 24ea6ea83..4c70aedb9 100644
--- a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
+++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
@@ -21,6 +21,7 @@ package org.apache.atlas.hook;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasConfiguration;
 import org.apache.atlas.AtlasConstants;
 import org.apache.atlas.kafka.NotificationProvider;
 import org.apache.atlas.model.notification.HookNotification;
@@ -63,6 +64,7 @@ public abstract class AtlasHook {
     public static final String CONF_METADATA_NAMESPACE                         
   = "atlas.metadata.namespace";
     public static final String CLUSTER_NAME_KEY                                
   = "atlas.cluster.name";
     public static final String DEFAULT_CLUSTER_NAME                            
   = "primary";
+    public static final String CONF_ATLAS_HOOK_MESSAGES_SORT_ENABLED           
   = "atlas.hook.messages.sort.enabled";
 
     protected static Configuration         atlasProperties;
     protected static NotificationInterface notificationInterface;
@@ -75,6 +77,8 @@ public abstract class AtlasHook {
     private static final int                  notificationMaxRetries;
     private static final int                  notificationRetryInterval;
     private static       ExecutorService      executor = null;
+    public  static final boolean              isRESTNotificationEnabled;
+    public  static final boolean              isHookMsgsSortEnabled;
 
 
     static {
@@ -95,6 +99,8 @@ public abstract class AtlasHook {
             failedMessagesLogger = null;
         }
 
+        isRESTNotificationEnabled = 
AtlasConfiguration.NOTIFICATION_HOOK_REST_ENABLED.getBoolean();
+        isHookMsgsSortEnabled     = 
atlasProperties.getBoolean(CONF_ATLAS_HOOK_MESSAGES_SORT_ENABLED, 
isRESTNotificationEnabled);
         metadataNamespace         = getMetadataNamespace(atlasProperties);
         notificationMaxRetries    = 
atlasProperties.getInt(ATLAS_NOTIFICATION_MAX_RETRIES, 3);
         notificationRetryInterval = 
atlasProperties.getInt(ATLAS_NOTIFICATION_RETRY_INTERVAL, 1000);
diff --git 
a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java 
b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
index 96dc5856a..89ec59caa 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
@@ -73,6 +73,11 @@ public class AtlasKafkaConsumer<T> extends 
AbstractNotificationConsumer<T> {
         return receive(this.pollTimeoutMilliSeconds, 
lastCommittedPartitionOffset);
     }
 
+    @Override
+    public List<AtlasKafkaMessage<T>> 
receiveRawRecordsWithCheckedCommit(Map<TopicPartition, Long> 
lastCommittedPartitionOffset) {
+        return receiveRawRecords(this.pollTimeoutMilliSeconds, 
lastCommittedPartitionOffset);
+    }
+
 
     @Override
     public void commit(TopicPartition partition, long offset) {
@@ -98,7 +103,15 @@ public class AtlasKafkaConsumer<T> extends 
AbstractNotificationConsumer<T> {
         }
     }
 
+    private List<AtlasKafkaMessage<T>> receiveRawRecords(long 
timeoutMilliSeconds, Map<TopicPartition, Long> lastCommittedPartitionOffset) {
+        return receive(timeoutMilliSeconds, lastCommittedPartitionOffset, 
true);
+    }
+
     private List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds, 
Map<TopicPartition, Long> lastCommittedPartitionOffset) {
+        return receive(timeoutMilliSeconds, lastCommittedPartitionOffset, 
false);
+    }
+
+    private List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds, 
Map<TopicPartition, Long> lastCommittedPartitionOffset, boolean 
isRawDataRequired) {
         List<AtlasKafkaMessage<T>> messages = new ArrayList();
 
         ConsumerRecords<?, ?> records = kafkaConsumer != null ? 
kafkaConsumer.poll(timeoutMilliSeconds) : null;
@@ -134,8 +147,17 @@ public class AtlasKafkaConsumer<T> extends 
AbstractNotificationConsumer<T> {
                     continue;
                 }
 
-                messages.add(new AtlasKafkaMessage(message, record.offset(), 
record.topic(), record.partition(),
-                                                            
deserializer.getMsgCreated(), deserializer.getSpooled()));
+                AtlasKafkaMessage kafkaMessage = null;
+
+                if (isRawDataRequired) {
+                    kafkaMessage = new AtlasKafkaMessage(message, 
record.offset(), record.topic(), record.partition(),
+                            deserializer.getMsgCreated(), 
deserializer.getSpooled(), deserializer.getSource(), record.value().toString());
+                } else {
+                    kafkaMessage = new AtlasKafkaMessage(message, 
record.offset(), record.topic(), record.partition(),
+                            deserializer.getMsgCreated(), 
deserializer.getSpooled(), deserializer.getSource());
+                }
+
+                messages.add(kafkaMessage);
             }
         }
 
diff --git 
a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java 
b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java
index af3727df4..390eca7ba 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java
@@ -26,13 +26,25 @@ public class AtlasKafkaMessage<T> {
     private final TopicPartition topicPartition;
     private final boolean        spooled;
     private final long           msgCreated;
+    private final String         source;
+    private final String         rawRecordData;
 
-    public AtlasKafkaMessage(T message, long offset, String topic, int 
partition, long msgCreated, boolean spooled) {
+    public AtlasKafkaMessage(T message, long offset, String topic, int 
partition, long msgCreated, boolean spooled, String source, String 
rawRecordData) {
         this.message        = message;
         this.offset         = offset;
         this.topicPartition = new TopicPartition(topic, partition);
         this.msgCreated     = msgCreated;
         this.spooled        = spooled;
+        this.source         = source;
+        this.rawRecordData  = rawRecordData;
+    }
+
+    public AtlasKafkaMessage(T message, long offset, String topic, int 
partition, long msgCreated, boolean spooled, String source) {
+        this(message, offset, topic, partition, msgCreated, spooled, source, 
null);
+    }
+
+    public AtlasKafkaMessage(T message, long offset, String topic, int 
partition, long msgCreated, boolean spooled) {
+        this(message, offset, topic, partition, msgCreated, spooled, null);
     }
 
     public AtlasKafkaMessage(T message, long offset, String topic, int 
partition) {
@@ -66,4 +78,8 @@ public class AtlasKafkaMessage<T> {
     public long getMsgCreated() {
         return this.msgCreated;
     }
+
+    public String getSource() { return this.source; }
+
+    public String getRawRecordData() { return this.rawRecordData; }
 }
diff --git 
a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java 
b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 32f5183a0..870d50814 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -58,6 +58,7 @@ public class KafkaNotification extends AbstractNotification 
implements Service {
     public static final Logger LOG = 
LoggerFactory.getLogger(KafkaNotification.class);
 
     public    static final String PROPERTY_PREFIX            = "atlas.kafka";
+    public    static final String UNSORTED_POSTFIX           = "_UNSORTED";
     public    static final String ATLAS_HOOK_TOPIC           = 
AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString();
     public    static final String ATLAS_ENTITIES_TOPIC       = 
AtlasConfiguration.NOTIFICATION_ENTITIES_TOPIC_NAME.getString();
     protected static final String CONSUMER_GROUP_ID_PROPERTY = "group.id";
@@ -67,9 +68,27 @@ public class KafkaNotification extends AbstractNotification 
implements Service {
 
     private static final String DEFAULT_CONSUMER_CLOSED_ERROR_MESSAGE = "This 
consumer has already been closed.";
 
+    public    static String ATLAS_HOOK_TOPIC_UNSORTED;
+    public    static String[] ATLAS_HOOK_UNSORTED_CONSUMER_TOPICS;
+
+    static {
+        try {
+            ATLAS_HOOK_TOPIC_UNSORTED = ATLAS_HOOK_TOPIC + UNSORTED_POSTFIX;
+            ATLAS_HOOK_UNSORTED_CONSUMER_TOPICS = ATLAS_HOOK_CONSUMER_TOPICS 
!= null && ATLAS_HOOK_CONSUMER_TOPICS.length > 0
+                    ? new String[ATLAS_HOOK_CONSUMER_TOPICS.length] : new 
String[] {ATLAS_HOOK_TOPIC_UNSORTED};
+
+            for (int i = 0; i < ATLAS_HOOK_CONSUMER_TOPICS.length; i++) {
+                ATLAS_HOOK_UNSORTED_CONSUMER_TOPICS[i] = 
ATLAS_HOOK_CONSUMER_TOPICS[i] + UNSORTED_POSTFIX;
+            }
+        } catch (Exception e) {
+            LOG.error("Error while initializing Kafka Notification", e);
+        }
+    }
+
     private static final Map<NotificationType, String> PRODUCER_TOPIC_MAP = 
new HashMap<NotificationType, String>() {
         {
             put(NotificationType.HOOK, ATLAS_HOOK_TOPIC);
+            put(NotificationType.HOOK_UNSORTED, ATLAS_HOOK_TOPIC_UNSORTED);
             put(NotificationType.ENTITIES, ATLAS_ENTITIES_TOPIC);
         }
     };
@@ -77,6 +96,7 @@ public class KafkaNotification extends AbstractNotification 
implements Service {
     private static final Map<NotificationType, String[]> CONSUMER_TOPICS_MAP = 
new HashMap<NotificationType, String[]>() {
         {
             put(NotificationType.HOOK, 
trimAndPurge(ATLAS_HOOK_CONSUMER_TOPICS));
+            put(NotificationType.HOOK_UNSORTED, 
trimAndPurge(ATLAS_HOOK_UNSORTED_CONSUMER_TOPICS));
             put(NotificationType.ENTITIES, 
trimAndPurge(ATLAS_ENTITIES_CONSUMER_TOPICS));
         }
     };
@@ -86,6 +106,7 @@ public class KafkaNotification extends AbstractNotification 
implements Service {
     private final Map<NotificationType, List<KafkaConsumer>> consumers = new 
HashMap<>();
     private final Map<NotificationType, KafkaProducer>       producers = new 
HashMap<>();
     private       String                                     
consumerClosedErrorMsg;
+    private final Map<String, KafkaProducer>                 producersByTopic 
= new HashMap<>();
 
     // ----- Constructors ----------------------------------------------------
 
@@ -255,6 +276,21 @@ public class KafkaNotification extends 
AbstractNotification implements Service {
         LOG.info("<== KafkaNotification.close()");
     }
 
+    //Sending messages received through HTTP or REST Notification Service to 
Producer
+    public void sendInternal(String topic, List<String> messages, boolean 
isSortNeeded) throws NotificationException {
+        KafkaProducer producer;
+        if (isSortNeeded) {
+            topic = topic + UNSORTED_POSTFIX;
+        }
+        producer = getOrCreateProducer(topic);
+        sendInternalToProducer(producer, topic, messages);
+    }
+
+    public void sendInternal(String topic, List<String> messages) throws 
NotificationException {
+        KafkaProducer producer = getOrCreateProducer(topic);
+
+        sendInternalToProducer(producer, topic, messages);
+    }
 
     // ----- AbstractNotification --------------------------------------------
     @Override
@@ -266,7 +302,11 @@ public class KafkaNotification extends 
AbstractNotification implements Service {
 
     @VisibleForTesting
     void sendInternalToProducer(Producer p, NotificationType notificationType, 
List<String> messages) throws NotificationException {
-        String               topic           = 
PRODUCER_TOPIC_MAP.get(notificationType);
+        String topic = PRODUCER_TOPIC_MAP.get(notificationType);
+        sendInternalToProducer(p, topic, messages);
+    }
+
+    void sendInternalToProducer(Producer p, String topic , List<String> 
messages) throws NotificationException {
         List<MessageContext> messageContexts = new ArrayList<>();
 
         for (String message : messages) {
@@ -308,6 +348,9 @@ public class KafkaNotification extends AbstractNotification 
implements Service {
     public Properties getConsumerProperties(NotificationType notificationType) 
{
         // find the configured group id for the given notification type
         String groupId = 
properties.getProperty(notificationType.toString().toLowerCase() + "." + 
CONSUMER_GROUP_ID_PROPERTY);
+        if (StringUtils.isEmpty(groupId)) {
+            groupId = "atlas";
+        }
 
         if (StringUtils.isEmpty(groupId)) {
             throw new IllegalStateException("No configuration group id set for 
the notification type " + notificationType);
@@ -346,21 +389,45 @@ public class KafkaNotification extends 
AbstractNotification implements Service {
     private KafkaProducer getOrCreateProducer(NotificationType 
notificationType) {
         LOG.debug("==> KafkaNotification.getOrCreateProducer()");
 
-        KafkaProducer ret = producers.get(notificationType);
+        KafkaProducer ret = getOrCreateProducerByCriteria(notificationType, 
producers, false);
+
+        LOG.debug("<== KafkaNotification.getOrCreateProducer()");
+
+        return ret;
+    }
+
+    private KafkaProducer getOrCreateProducer(String topic) {
+        LOG.debug("==> KafkaNotification.getOrCreateProducer() by Topic");
+
+        KafkaProducer ret = getOrCreateProducerByCriteria(topic, 
producersByTopic, true);
+
+        LOG.debug("<== KafkaNotification.getOrCreateProducer by Topic");
+
+        return ret;
+    }
+
+    private KafkaProducer getOrCreateProducerByCriteria(Object 
producerCriteria, Map producersByCriteria, boolean fetchByTopic) {
+        LOG.debug("==> KafkaNotification.getOrCreateProducerByCriteria()");
+
+        if ((fetchByTopic && !(producerCriteria instanceof String)) || 
(!fetchByTopic && !(producerCriteria instanceof NotificationType))) {
+            LOG.error("Error while retrieving Producer due to invalid 
criteria");
+        }
+
+        KafkaProducer ret = (KafkaProducer) 
producersByCriteria.get(producerCriteria);
 
         if (ret == null) {
             synchronized (this) {
-                ret = producers.get(notificationType);
+                ret = (KafkaProducer) 
producersByCriteria.get(producerCriteria);
 
                 if (ret == null) {
                     ret = new KafkaProducer(properties);
 
-                    producers.put(notificationType, ret);
+                    producersByCriteria.put(producerCriteria, ret);
                 }
             }
         }
 
-        LOG.debug("<== KafkaNotification.getOrCreateProducer()");
+        LOG.debug("<== KafkaNotification.getOrCreateProducerByCriteria()");
 
         return ret;
     }
diff --git 
a/notification/src/main/java/org/apache/atlas/kafka/NotificationProvider.java 
b/notification/src/main/java/org/apache/atlas/kafka/NotificationProvider.java
index b35af97fd..9d8686257 100644
--- 
a/notification/src/main/java/org/apache/atlas/kafka/NotificationProvider.java
+++ 
b/notification/src/main/java/org/apache/atlas/kafka/NotificationProvider.java
@@ -17,26 +17,28 @@
  */
 package org.apache.atlas.kafka;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasException;
-import org.apache.atlas.notification.LogConfigUtils;
+import org.apache.atlas.hook.AtlasHook;
+import org.apache.atlas.notification.AbstractNotification;
 import org.apache.atlas.notification.NotificationInterface;
+import org.apache.atlas.notification.rest.RestNotification;
 import org.apache.atlas.notification.spool.AtlasFileSpool;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-
 /**
  * Provider class for Notification interfaces
  */
 public class NotificationProvider {
     private static final Logger LOG = 
LoggerFactory.getLogger(NotificationProvider.class);
 
-    private static final String CONF_ATLAS_HOOK_SPOOL_ENABLED    = 
"atlas.hook.spool.enabled";
-    private static final String CONF_ATLAS_HOOK_SPOOL_DIR        = 
"atlas.hook.spool.dir";
+    @VisibleForTesting
+    public  static final String CONF_ATLAS_HOOK_SPOOL_ENABLED = 
"atlas.hook.spool.enabled";
+    private static final String CONF_ATLAS_HOOK_SPOOL_DIR     = 
"atlas.hook.spool.dir";
 
     private static final boolean CONF_ATLAS_HOOK_SPOOL_ENABLED_DEFAULT = false;
 
@@ -45,25 +47,32 @@ public class NotificationProvider {
     public static NotificationInterface get() {
         if (notificationProvider == null) {
             try {
-                Configuration     conf     = ApplicationProperties.get();
-                KafkaNotification kafka    = new KafkaNotification(conf);
-                String            spoolDir = getSpoolDir(conf);
+                Configuration        conf        = ApplicationProperties.get();
+                String               spoolDir    = getSpoolDir(conf);
+                AbstractNotification absNotifier = null;
+
+                if (AtlasHook.isRESTNotificationEnabled) {
+                    absNotifier = new RestNotification(conf);
+                } else {
+                    absNotifier = new KafkaNotification(conf);
+                }
 
                 if (isSpoolingEnabled(conf) && 
StringUtils.isNotEmpty(spoolDir)) {
                     LOG.info("Notification spooling is enabled: spool 
directory={}", spoolDir);
 
                     conf.setProperty(CONF_ATLAS_HOOK_SPOOL_DIR, spoolDir);
 
-                    notificationProvider = new AtlasFileSpool(conf, kafka);
+                    notificationProvider = new AtlasFileSpool(conf, 
absNotifier);
                 } else {
                     LOG.info("Notification spooling is not enabled");
 
-                    notificationProvider = kafka;
+                    notificationProvider = absNotifier;
                 }
             } catch (AtlasException e) {
-                throw new RuntimeException(e);
+                throw new RuntimeException("Error while initializing 
Notification interface", e);
             }
         }
+        LOG.debug("NotificationInterface of type {} is enabled", 
notificationProvider.getClass().getSimpleName());
         return notificationProvider;
     }
 
diff --git 
a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
 
b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
index 207747d7d..3048b9c95 100644
--- 
a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
+++ 
b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
@@ -65,6 +65,7 @@ public abstract class AtlasNotificationMessageDeserializer<T> 
implements Message
     private final AtomicLong                          
messageCountSinceLastInterval = new AtomicLong(0);
     private long                                      msgCreated;
     private boolean                                   spooled;
+    private String                                    source;
     // ----- Constructors ----------------------------------------------------
 
     /**
@@ -112,6 +113,10 @@ public abstract class 
AtlasNotificationMessageDeserializer<T> implements Message
         return this.spooled;
     }
 
+    public String getSource() {
+        return this.source;
+    }
+
     @Override
     public T deserialize(String messageJson) {
         final T ret;
@@ -120,6 +125,7 @@ public abstract class 
AtlasNotificationMessageDeserializer<T> implements Message
         messageCountSinceLastInterval.incrementAndGet();
         this.msgCreated = 0;
         this.spooled = false;
+        this.source  = null;
 
         AtlasNotificationBaseMessage msg = AtlasType.fromV1Json(messageJson, 
AtlasNotificationMessage.class);
 
@@ -128,6 +134,7 @@ public abstract class 
AtlasNotificationMessageDeserializer<T> implements Message
         } else  {
             this.msgCreated = ((AtlasNotificationMessage) 
msg).getMsgCreationTime();
             this.spooled = ((AtlasNotificationMessage) msg).getSpooled();
+            this.source = msg.getSource() != null ? 
msg.getSource().getSource() : null;
 
             String msgJson = messageJson;
 
diff --git 
a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
 
b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
index 1fb9f9989..83af92b64 100644
--- 
a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
+++ 
b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
@@ -63,4 +63,11 @@ public interface NotificationConsumer<T> {
      * @return List containing kafka message and partionId and offset.
      */
     List<AtlasKafkaMessage<T>> receiveWithCheckedCommit(Map<TopicPartition, 
Long> lastCommittedPartitionOffset);
+
+    /**
+     * Fetch raw data for the topics from Kafka, if lastCommittedOffset same 
as message
+     * received offset, it will proceed with commit.
+     * @return List containing kafka message and partitionId and offset.
+     */
+    List<AtlasKafkaMessage<T>> 
receiveRawRecordsWithCheckedCommit(Map<TopicPartition, Long> 
lastCommittedPartitionOffset);
 }
diff --git 
a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
 
b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
index a9cd4a6bb..3fb616edb 100644
--- 
a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
+++ 
b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
@@ -46,6 +46,9 @@ public interface NotificationInterface {
         // Notifications from the Atlas integration hooks.
         HOOK(new HookMessageDeserializer()),
 
+        // Notifications from the Atlas integration hooks - unsorted.
+        HOOK_UNSORTED(new HookMessageDeserializer()),
+
         // Notifications to entity change consumers.
         ENTITIES(new EntityMessageDeserializer());
 
diff --git 
a/notification/src/main/java/org/apache/atlas/notification/rest/RestNotification.java
 
b/notification/src/main/java/org/apache/atlas/notification/rest/RestNotification.java
new file mode 100644
index 000000000..fb598f899
--- /dev/null
+++ 
b/notification/src/main/java/org/apache/atlas/notification/rest/RestNotification.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.rest;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasConfiguration;
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.notification.AtlasNotificationBaseMessage;
+import org.apache.atlas.notification.AbstractNotification;
+import org.apache.atlas.notification.NotificationConsumer;
+import org.apache.atlas.notification.NotificationException;
+import org.apache.atlas.utils.AuthenticationUtil;
+import org.apache.commons.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+import static org.apache.atlas.kafka.KafkaNotification.ATLAS_ENTITIES_TOPIC;
+import static org.apache.atlas.kafka.KafkaNotification.ATLAS_HOOK_TOPIC;
+
+public class RestNotification extends AbstractNotification {
+
+    private static final Logger  LOG                        = 
LoggerFactory.getLogger(RestNotification.class);
+    private static final int     BATCH_MAX_LENGTH_BYTES     = 
AtlasConfiguration.NOTIFICATION_REST_BODY_MAX_LENGTH_BYTES.getInt();
+    private static final String  ATLAS_ENDPOINT             = 
"atlas.rest.address";
+    private static final String  BASIC_AUTH_USERNAME        = 
"atlas.rest.basic.auth.username";
+    private static final String  BASIC_AUTH_PASSWORD        = 
"atlas.rest.basic.auth.password";
+    private static final String  DEFAULT_ATLAS_URL          = 
"http://localhost:31000/";;
+
+    private static final Map<NotificationType, String> PRODUCER_TOPIC_MAP = 
new HashMap<NotificationType, String>() {
+        {
+            put(NotificationType.HOOK, ATLAS_HOOK_TOPIC);
+            put(NotificationType.ENTITIES, ATLAS_ENTITIES_TOPIC);
+        }
+    };
+
+    @VisibleForTesting
+    public AtlasClientV2 atlasClientV2;
+
+    public RestNotification(Configuration configuration) throws AtlasException 
{
+        super();
+        setupAtlasClientV2(configuration);
+    }
+
+    private AtlasClientV2 setupAtlasClientV2(Configuration configuration) 
throws AtlasException {
+        if (atlasClientV2 != null) {
+            return atlasClientV2;
+        }
+        try {
+            String[] atlasEndPoint = 
configuration.getStringArray(ATLAS_ENDPOINT);
+
+            if (atlasEndPoint == null || atlasEndPoint.length == 0) {
+                atlasEndPoint = new String[] { DEFAULT_ATLAS_URL };
+            }
+
+            if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
+                String   fileAuthUsername          = 
configuration.getString(BASIC_AUTH_USERNAME, "admin");
+                String   fileAuthPassword          = 
configuration.getString(BASIC_AUTH_PASSWORD, "admin123");
+                String[] basicAuthUsernamePassword = (fileAuthUsername == null 
|| fileAuthPassword == null)
+                        ? AuthenticationUtil.getBasicAuthenticationInput()
+                        : new String[]{fileAuthUsername, fileAuthPassword};
+
+                atlasClientV2 = new AtlasClientV2(atlasEndPoint, 
basicAuthUsernamePassword);
+            } else {
+                atlasClientV2 = new AtlasClientV2(atlasEndPoint);
+            }
+        } catch (AtlasException e) {
+            throw new AtlasException(e);
+        }
+
+        return atlasClientV2;
+    }
+
+    @Override
+    public void sendInternal(NotificationType type, List<String> messages) 
throws NotificationException {
+        String              topic   = PRODUCER_TOPIC_MAP.get(type);
+        List<List<String>>  batches = getBatches(messages);
+
+        int batchCounter = 0;
+        try {
+            for (List<String> batch: batches) {
+                batchCounter++;
+                atlasClientV2.postNotificationToTopic(topic, batch);
+            }
+        } catch (AtlasServiceException e) {
+            if 
(e.getMessage().contains(AtlasErrorCode.NOTIFICATION_EXCEPTION.getErrorCode())) 
{
+                LOG.error("Sending notifications through REST interface failed 
starting from batch# {}", batchCounter);
+                throw new NotificationException(e);
+            } else {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    private List<List<String>> getBatches(List<String> messages) {
+        List<List<String>>  batches     = new ArrayList();
+        List<String>        batch       = new ArrayList();
+        int                 batchSize   = 0;
+
+        for (String message : messages) {
+            byte[] msgBytes = 
AtlasNotificationBaseMessage.getBytesUtf8(message);
+
+            if (batchSize > 0 && batchSize + msgBytes.length > 
BATCH_MAX_LENGTH_BYTES) {
+                batches.add(batch);
+
+                batch     = new ArrayList();
+                batchSize = 0;
+            }
+            batch.add(message);
+            batchSize += msgBytes.length;
+        }
+        batches.add(batch);
+        return batches;
+    }
+
+    @Override
+    public <T> List<NotificationConsumer<T>> createConsumers(NotificationType 
notificationType, int numConsumers) {
+        return null;
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public boolean isReady(NotificationType type) {
+        return true;
+    }
+}
diff --git 
a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
 
b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
index 1b486e528..aee59a395 100644
--- 
a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
+++ 
b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
@@ -229,6 +229,11 @@ public class AbstractNotificationConsumerTest {
         public List<AtlasKafkaMessage<TestMessage>> 
receiveWithCheckedCommit(Map<TopicPartition, Long> 
lastCommittedPartitionOffset) {
             return receive();
         }
+
+        @Override
+        public List<AtlasKafkaMessage<TestMessage>> 
receiveRawRecordsWithCheckedCommit(Map<TopicPartition, Long> 
lastCommittedPartitionOffset) {
+            return null;
+        }
     }
 
     public static class TestMessageDeserializer extends 
AbstractMessageDeserializer<TestMessage> {
diff --git 
a/notification/src/test/java/org/apache/atlas/notification/RestNotificationTest.java
 
b/notification/src/test/java/org/apache/atlas/notification/RestNotificationTest.java
new file mode 100644
index 000000000..476518df6
--- /dev/null
+++ 
b/notification/src/test/java/org/apache/atlas/notification/RestNotificationTest.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasBaseClient;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasConfiguration;
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.kafka.NotificationProvider;
+import org.apache.atlas.notification.rest.RestNotification;
+import org.apache.commons.configuration.Configuration;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import static org.apache.atlas.kafka.KafkaNotification.ATLAS_HOOK_TOPIC;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+
+public class RestNotificationTest {
+
+    private NotificationInterface notifier;
+    private Configuration conf;
+
+    @Mock
+    private WebResource service;
+
+    @Mock
+    private WebResource.Builder resourceBuilderMock;
+
+    @BeforeClass
+    public void setup() throws Exception {
+        MockitoAnnotations.initMocks(this);
+
+        conf = ApplicationProperties.get();
+
+        
conf.setProperty(AtlasConfiguration.NOTIFICATION_HOOK_REST_ENABLED.getPropertyName(),
 true);
+        conf.setProperty(NotificationProvider.CONF_ATLAS_HOOK_SPOOL_ENABLED, 
false);
+
+        notifier = NotificationProvider.get();
+
+    }
+
+    private WebResource.Builder setupBuilder(AtlasClientV2.API api, 
WebResource webResource) {
+        when(webResource.path(api.getPath())).thenReturn(service);
+        when(webResource.path(api.getNormalizedPath())).thenReturn(service);
+
+        return getBuilder(service);
+    }
+
+    private WebResource.Builder getBuilder(WebResource resourceObject) {
+        
when(resourceObject.getRequestBuilder()).thenReturn(resourceBuilderMock);
+        when(resourceObject.path(anyString())).thenReturn(resourceObject);
+        
when(resourceBuilderMock.accept(MediaType.APPLICATION_JSON)).thenReturn(resourceBuilderMock);
+        
when(resourceBuilderMock.type(MediaType.MULTIPART_FORM_DATA)).thenReturn(resourceBuilderMock);
+        when(resourceBuilderMock.type(MediaType.APPLICATION_JSON + "; 
charset=UTF-8")).thenReturn(resourceBuilderMock);
+
+        return resourceBuilderMock;
+    }
+
+    @Test
+    public void testNotificationProvider () throws Exception {
+        assertEquals(notifier.getClass(), RestNotification.class);
+    }
+
+    @Test
+    public void testPostNotificationToTopic () throws Exception {
+        AtlasClientV2       client   = new AtlasClientV2(service, conf);
+        AtlasBaseClient.API api      = 
client.formatPathWithParameter(AtlasClientV2.API_V2.POST_NOTIFICATIONS_TO_TOPIC,
 ATLAS_HOOK_TOPIC);
+        WebResource.Builder builder  = setupBuilder(api, service);
+        ClientResponse      response = mock(ClientResponse.class);
+
+        
when(response.getStatus()).thenReturn(Response.Status.NO_CONTENT.getStatusCode());
+        when(builder.method(anyString(), Matchers.<Class>any(), 
anyList())).thenReturn(response);
+
+        ((RestNotification)notifier).atlasClientV2 = client;
+
+        try {
+            
((RestNotification)notifier).sendInternal(NotificationInterface.NotificationType.HOOK,
 new ArrayList<String>(Arrays.asList("Dummy")));
+        } catch (NotificationException e) {
+            Assert.fail("Failed with Exception");
+        }
+    }
+
+    @Test
+    public void testNotificationException () throws Exception {
+        AtlasClientV2       client   = new AtlasClientV2(service, conf);
+        AtlasBaseClient.API api      = 
client.formatPathWithParameter(AtlasClientV2.API_V2.POST_NOTIFICATIONS_TO_TOPIC,
 ATLAS_HOOK_TOPIC);
+        WebResource.Builder builder  = setupBuilder(api, service);
+        ClientResponse      response = mock(ClientResponse.class);
+
+        
when(response.getStatus()).thenReturn(AtlasErrorCode.NOTIFICATION_EXCEPTION.getHttpCode().getStatusCode());
+        
when(response.getEntity(String.class)).thenReturn(AtlasErrorCode.NOTIFICATION_EXCEPTION.getErrorCode());
+        when(builder.method(anyString(), Matchers.<Class>any(), 
anyList())).thenReturn(response);
+
+        ((RestNotification)notifier).atlasClientV2 = client;
+
+        try {
+            
((RestNotification)notifier).sendInternal(NotificationInterface.NotificationType.HOOK,
 new ArrayList<String>(Arrays.asList("Dummy")));
+        } catch (NotificationException e) {
+            
Assert.assertTrue(e.getMessage().contains(AtlasErrorCode.NOTIFICATION_EXCEPTION.getErrorCode()));
+        }
+    }
+
+}
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 1cdfcef8a..936423aa2 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -22,7 +22,9 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.atlas.*;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.ha.HAConfiguration;
+import org.apache.atlas.hook.AtlasHook;
 import org.apache.atlas.kafka.AtlasKafkaMessage;
+import org.apache.atlas.kafka.KafkaNotification;
 import org.apache.atlas.listener.ActiveStateChangeHandler;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
@@ -83,6 +85,7 @@ import org.springframework.stereotype.Component;
 import javax.inject.Inject;
 import java.time.Instant;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -91,6 +94,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -123,6 +127,9 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
     private static final String 
EXCEPTION_CLASS_NAME_PERMANENTLOCKING_EXCEPTION = "PermanentLockingException";
     private static final int KAFKA_CONSUMER_SHUTDOWN_WAIT = 30000;
 
+    private static final String ATLAS_HOOK_CONSUMER_THREAD_NAME          = 
"atlas-hook-consumer-thread";
+    private static final String ATLAS_HOOK_UNSORTED_CONSUMER_THREAD_NAME = 
"atlas-hook-unsorted-consumer-thread";
+
     // from org.apache.hadoop.hive.ql.parse.SemanticAnalyzer
     public static final String DUMMY_DATABASE               = 
"_dummy_database";
     public static final String DUMMY_TABLE                  = "_dummy_table";
@@ -195,6 +202,8 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
     private       Instant                       nextStatsLogTime = 
AtlasMetricsCounter.getNextHourStartTime(Instant.now());
     private final Map<TopicPartition, Long>     lastCommittedPartitionOffset;
     private final EntityCorrelationManager      entityCorrelationManager;
+    private final long                          consumerMsgBufferingIntervalMS;
+    private final int                           consumerMsgBufferingBatchSize;
 
     @VisibleForTesting
     final int consumerRetryInterval;
@@ -230,6 +239,8 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
         largeMessageProcessingTimeThresholdMs         = 
applicationProperties.getInt("atlas.notification.consumer.large.message.processing.time.threshold.ms",
 60 * 1000);  //  60 sec by default
         createShellEntityForNonExistingReference      = 
AtlasConfiguration.NOTIFICATION_CREATE_SHELL_ENTITY_FOR_NON_EXISTING_REF.getBoolean();
         authorizeUsingMessageUser                     = 
applicationProperties.getBoolean(CONSUMER_AUTHORIZE_USING_MESSAGE_USER, false);
+        consumerMsgBufferingIntervalMS                = 
AtlasConfiguration.NOTIFICATION_HOOK_CONSUMER_BUFFERING_INTERVAL.getInt() * 
1000;
+        consumerMsgBufferingBatchSize                 = 
AtlasConfiguration.NOTIFICATION_HOOK_CONSUMER_BUFFERING_BATCH_SIZE.getInt();
 
         int authnCacheTtlSeconds = 
applicationProperties.getInt(CONSUMER_AUTHORIZE_AUTHN_CACHE_TTL_SECONDS, 300);
 
@@ -350,17 +361,35 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
     }
 
     private void startConsumers(ExecutorService executorService) {
-        int                                          numThreads            = 
applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1);
+        int                                                           
numThreads                  = 
applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1);
+        Map<NotificationConsumer<HookNotification>, NotificationType> 
notificationConsumersByType = new HashMap<>();
+
         List<NotificationConsumer<HookNotification>> notificationConsumers = 
notificationInterface.createConsumers(NotificationType.HOOK, numThreads);
+        for (NotificationConsumer<HookNotification> notificationConsumer : 
notificationConsumers) {
+            notificationConsumersByType.put(notificationConsumer, 
NotificationType.HOOK);
+        }
+
+        if (AtlasHook.isHookMsgsSortEnabled) {
+            List<NotificationConsumer<HookNotification>> 
unsortedNotificationConsumers = 
notificationInterface.createConsumers(NotificationType.HOOK_UNSORTED, 
numThreads);
+            for (NotificationConsumer<HookNotification> 
unsortedNotificationConsumer : unsortedNotificationConsumers) {
+                notificationConsumersByType.put(unsortedNotificationConsumer, 
NotificationType.HOOK_UNSORTED);
+            }
+        }
 
         if (executorService == null) {
-            executorService = 
Executors.newFixedThreadPool(notificationConsumers.size(), new 
ThreadFactoryBuilder().setNameFormat(THREADNAME_PREFIX + " thread-%d").build());
+            executorService = 
Executors.newFixedThreadPool(notificationConsumersByType.size(), new 
ThreadFactoryBuilder().setNameFormat(THREADNAME_PREFIX + " thread-%d").build());
         }
 
         executors = executorService;
 
-        for (final NotificationConsumer<HookNotification> consumer : 
notificationConsumers) {
-            HookConsumer hookConsumer = new HookConsumer(consumer);
+        for (final NotificationConsumer<HookNotification> consumer : 
notificationConsumersByType.keySet()) {
+            String hookConsumerName   = ATLAS_HOOK_CONSUMER_THREAD_NAME;
+
+            if 
(notificationConsumersByType.get(consumer).equals(NotificationType.HOOK_UNSORTED))
 {
+                hookConsumerName   = ATLAS_HOOK_UNSORTED_CONSUMER_THREAD_NAME;
+            }
+
+            HookConsumer hookConsumer = new HookConsumer(hookConsumerName, 
consumer);
 
             consumers.add(hookConsumer);
             executors.submit(hookConsumer);
@@ -529,8 +558,16 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
         private final List<String>                           failedMessages = 
new ArrayList<>();
         private final AdaptiveWaiter                         adaptiveWaiter = 
new AdaptiveWaiter(minWaitDuration, maxWaitDuration, minWaitDuration);
 
+        private int duplicateKeyCounter = 1;
+
         public HookConsumer(NotificationConsumer<HookNotification> consumer) {
-            super("atlas-hook-consumer-thread");
+            super(ATLAS_HOOK_CONSUMER_THREAD_NAME);
+
+            this.consumer = consumer;
+        }
+
+        public HookConsumer(String consumerThreadName, 
NotificationConsumer<HookNotification> consumer) {
+            super(consumerThreadName);
 
             this.consumer = consumer;
         }
@@ -548,10 +585,15 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
             try {
                 while (shouldRun.get()) {
                     try {
-                        List<AtlasKafkaMessage<HookNotification>> messages = 
consumer.receiveWithCheckedCommit(lastCommittedPartitionOffset);
-
-                        for (AtlasKafkaMessage<HookNotification> msg : 
messages) {
-                            handleMessage(msg);
+                        if 
(StringUtils.equals(ATLAS_HOOK_UNSORTED_CONSUMER_THREAD_NAME, this.getName())) {
+                            long                                            
msgBufferingStartTime = System.currentTimeMillis();
+                            Map<String,AtlasKafkaMessage<HookNotification>> 
msgBuffer             = new TreeMap<>();
+                            
sortAndPublishMsgsToAtlasHook(msgBufferingStartTime, msgBuffer);
+                        } else {
+                            List<AtlasKafkaMessage<HookNotification>> messages 
= consumer.receiveWithCheckedCommit(lastCommittedPartitionOffset);
+                            for (AtlasKafkaMessage<HookNotification> msg : 
messages) {
+                                handleMessage(msg);
+                            }
                         }
                     } catch (IllegalStateException ex) {
                         adaptiveWaiter.pause(ex);
@@ -576,6 +618,63 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
             }
         }
 
+        private void resetDuplicateKeyCounter() {
+            duplicateKeyCounter = 1;
+        }
+
+        private String getKey(String msgCreated, String source) {
+            return String.format("%s_%s", msgCreated, source);
+        }
+
+        private void sortMessages(AtlasKafkaMessage<HookNotification> msg, 
Map<String,AtlasKafkaMessage<HookNotification>> msgBuffer) {
+            String key = getKey(Long.toString(msg.getMsgCreated()), 
msg.getSource());
+            if (msgBuffer.containsKey(key)) { //Duplicate key can possible for 
messages from same source with same msgCreationTime
+                key = getKey(key, Integer.toString(duplicateKeyCounter));
+                duplicateKeyCounter++;
+            }
+            msgBuffer.put(key, msg);
+        }
+
+        void sortAndPublishMsgsToAtlasHook(long msgBufferingStartTime, 
Map<String,AtlasKafkaMessage<HookNotification>> msgBuffer) throws 
NotificationException {
+
+            List<AtlasKafkaMessage<HookNotification>> messages     = 
consumer.receiveRawRecordsWithCheckedCommit(lastCommittedPartitionOffset);
+            AtlasKafkaMessage<HookNotification>       maxOffsetMsg = null;
+            long                                      maxOffsetProcessed = 0;
+
+            messages.stream().forEach(x -> sortMessages(x, msgBuffer));
+
+            if (msgBuffer.size() < consumerMsgBufferingBatchSize && 
System.currentTimeMillis() - msgBufferingStartTime < 
consumerMsgBufferingIntervalMS) {
+                sortAndPublishMsgsToAtlasHook(msgBufferingStartTime, 
msgBuffer);
+                return;
+            }
+
+            for (AtlasKafkaMessage<HookNotification> msg : msgBuffer.values()) 
{
+                String hookTopic   = StringUtils.isNotEmpty(msg.getTopic()) ? 
msg.getTopic().split(KafkaNotification.UNSORTED_POSTFIX)[0] : 
KafkaNotification.ATLAS_HOOK_TOPIC;
+                if (maxOffsetProcessed == 0 || maxOffsetProcessed < 
msg.getOffset()) {
+                    maxOffsetMsg       = msg;
+                    maxOffsetProcessed = msg.getOffset();
+                }
+
+                
((KafkaNotification)notificationInterface).sendInternal(hookTopic,
+                        StringUtils.isNotEmpty(msg.getRawRecordData()) ? 
Arrays.asList(msg.getRawRecordData()) : 
Arrays.asList(msg.getMessage().toString()));
+            }
+
+
+            /** In case of failure while publishing sorted messages(above for 
loop), consuming of unsorted messages should start from the initial offset
+              * Explicitly keeping this for loop separate to commit messages 
only after sending all batch messages to hook topic
+              */
+            for (AtlasKafkaMessage<HookNotification> msg : msgBuffer.values()) 
{
+                commit(msg);
+            }
+
+            if (maxOffsetMsg != null) {
+                commit(maxOffsetMsg);
+            }
+
+            msgBuffer.clear();
+            resetDuplicateKeyCounter();
+        }
+
         @VisibleForTesting
         void handleMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) {
             AtlasPerfTracer  perf           = null;
diff --git 
a/webapp/src/main/java/org/apache/atlas/web/rest/NotificationREST.java 
b/webapp/src/main/java/org/apache/atlas/web/rest/NotificationREST.java
new file mode 100644
index 000000000..295579e46
--- /dev/null
+++ b/webapp/src/main/java/org/apache/atlas/web/rest/NotificationREST.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.web.rest;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.atlas.AtlasConfiguration;
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.authorize.AtlasAdminAccessRequest;
+import org.apache.atlas.authorize.AtlasAuthorizationUtils;
+import org.apache.atlas.authorize.AtlasPrivilege;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.hook.AtlasHook;
+import org.apache.atlas.kafka.KafkaNotification;
+import org.apache.atlas.notification.NotificationException;
+import org.apache.atlas.notification.NotificationInterface;
+import org.apache.atlas.utils.AtlasJson;
+import org.apache.atlas.web.util.Servlets;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.*;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.util.*;
+
+@Path("v2/notification")
+@Singleton
+@Service
+@Consumes({Servlets.JSON_MEDIA_TYPE, MediaType.APPLICATION_JSON})
+@Produces({Servlets.JSON_MEDIA_TYPE, MediaType.APPLICATION_JSON})
+public class NotificationREST {
+    private static final Logger                        LOG                     
       = LoggerFactory.getLogger(NotificationREST.class);
+    public  static final String                        ATLAS_HOOK_TOPIC        
       = AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString();
+    public  static final String                        ATLAS_ENTITIES_TOPIC    
       = AtlasConfiguration.NOTIFICATION_ENTITIES_TOPIC_NAME.getString();
+    private static final String[]                      
ATLAS_HOOK_CONSUMER_TOPICS     = 
AtlasConfiguration.NOTIFICATION_HOOK_CONSUMER_TOPIC_NAMES.getStringArray(ATLAS_HOOK_TOPIC);
+    private static final String[]                      
ATLAS_ENTITIES_CONSUMER_TOPICS = 
AtlasConfiguration.NOTIFICATION_ENTITIES_CONSUMER_TOPIC_NAMES.getStringArray(ATLAS_ENTITIES_TOPIC);
+    private static final Set<String>                   TOPICS                  
       = new HashSet<>();
+
+    private final NotificationInterface notificationInterface;
+
+    static {
+        TOPICS.addAll(Arrays.asList(ATLAS_HOOK_CONSUMER_TOPICS));
+        TOPICS.addAll(Arrays.asList(ATLAS_ENTITIES_CONSUMER_TOPICS));
+    }
+
+    @Inject
+    public NotificationREST(NotificationInterface notificationInterface) {
+        this.notificationInterface = notificationInterface;
+    }
+
+    /**
+     * Publish notifications on Kafka topic
+     *
+     *  @param topicName - nameOfTheQueue
+     *  @throws AtlasBaseException
+     */
+    @POST
+    @Path("/topic/{topicName}")
+    @Consumes({Servlets.JSON_MEDIA_TYPE, MediaType.APPLICATION_JSON})
+    public void handleNotifications(@PathParam("topicName") String topicName, 
@Context HttpServletRequest request) throws AtlasBaseException, IOException {
+        LOG.debug("Handling notifications for topic {}", topicName);
+        AtlasAuthorizationUtils.verifyAccess(new 
AtlasAdminAccessRequest(AtlasPrivilege.SERVICE_NOTIFICATION_POST), "post on 
rest notification service");
+
+        if (!TOPICS.contains(topicName)) {
+            throw new AtlasBaseException(AtlasErrorCode.INVALID_TOPIC_NAME, 
topicName);
+        }
+
+        String            messagesAsJson = Servlets.getRequestPayload(request);
+        List<String>      messages       = getMessagesToNotify(messagesAsJson);
+
+        try {
+            KafkaNotification  notifier  = (KafkaNotification) 
notificationInterface;
+            notifier.sendInternal(topicName, messages, 
AtlasHook.isHookMsgsSortEnabled);
+
+        } catch (NotificationException exception) {
+            List<String> failedMessages      = exception.getFailedMessages();
+            String       concatenatedMessage = 
StringUtils.join(failedMessages, "\n");
+
+            throw new 
AtlasBaseException(AtlasErrorCode.NOTIFICATION_EXCEPTION, exception, 
concatenatedMessage);
+        }
+
+    }
+
+    private List<String> getMessagesToNotify(String messagesAsJson) {
+        List<String> messages = new ArrayList<>();
+
+        try {
+            ArrayNode messageNodes = 
AtlasJson.parseToV1ArrayNode(messagesAsJson);
+            for (JsonNode messageNode : messageNodes) {
+                messages.add(AtlasJson.toV1Json(messageNode));
+            }
+        } catch (IOException e) {
+            messages.add(messagesAsJson);
+        }
+
+        return messages;
+    }
+
+}
\ No newline at end of file
diff --git 
a/webapp/src/test/java/org/apache/atlas/web/integration/NotificationRestIT.java 
b/webapp/src/test/java/org/apache/atlas/web/integration/NotificationRestIT.java
new file mode 100644
index 000000000..2c907598d
--- /dev/null
+++ 
b/webapp/src/test/java/org/apache/atlas/web/integration/NotificationRestIT.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.web.integration;
+
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.utils.TestResourceFileUtils;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+
+import static org.apache.atlas.kafka.KafkaNotification.ATLAS_HOOK_TOPIC;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+
+public class NotificationRestIT extends BaseResourceIT {
+
+    @Test
+    public void unAuthPostNotification() throws IOException {
+        AtlasClientV2 unAuthClient  = new AtlasClientV2(atlasUrls, new 
String[]{"admin", "wr0ng_pa55w0rd"});
+
+        try {
+            unAuthClient.postNotificationToTopic(ATLAS_HOOK_TOPIC, new 
ArrayList<String>(Arrays.asList("Dummy")));
+        } catch(AtlasServiceException e) {
+            assertNotNull(e.getStatus(), "expected server error code in the 
status");
+        }
+    }
+
+    @Test
+    public void postNotificationBasicTest() throws Exception {
+        String db_name            = "db_" + randomString();
+        String cluster_name       = "cl" + randomString();
+        String qualifiedName      = db_name + "@" + cluster_name;
+        String notificationString = 
TestResourceFileUtils.getJson("notifications/create-db")
+                .replaceAll("--name--", db_name).replaceAll("--clName--", 
cluster_name)
+                .replace("\"--ts--\"", String.valueOf((new Date()).getTime()));
+
+        try {
+            atlasClientV2.postNotificationToTopic(ATLAS_HOOK_TOPIC, new 
ArrayList<String>(Arrays.asList(notificationString)));
+
+            waitFor(MAX_WAIT_TIME, new Predicate() {
+                @Override
+                public boolean evaluate() throws Exception {
+                    ArrayNode results = searchByDSL(String.format("%s where 
qualifiedName='%s'", DATABASE_TYPE_BUILTIN, qualifiedName));
+
+                    return results.size() == 1;
+                }
+            });
+        } catch(AtlasServiceException e) {
+            assertNull(e.getStatus(), "expected no server error code in the 
status");
+        }
+
+    }
+}
diff --git a/webapp/src/test/resources/json/notifications/create-db-ddl.json 
b/webapp/src/test/resources/json/notifications/create-db-ddl.json
new file mode 100644
index 000000000..443495dde
--- /dev/null
+++ b/webapp/src/test/resources/json/notifications/create-db-ddl.json
@@ -0,0 +1 @@
+{"version":{"version":"1.0.0","versionParts":[1]},"msgCompressionKind":"NONE","msgSplitIdx":1,"msgSplitCount":1,"msgSourceIP":"172.27.100.78","msgCreatedBy":"hive","msgCreationTime":"--ts--","spooled":false,"message":{"type":"ENTITY_CREATE_V2","user":"root","entities":{"entities":[{"typeName":"hive_db_ddl","attributes":{"serviceType":"hive","qualifiedName":"--name--@--clName--:--execTime--","execTime":"--execTime--","queryText":"create
 database --name--","name":"create database --name--" [...]
\ No newline at end of file
diff --git a/webapp/src/test/resources/json/notifications/create-db.json 
b/webapp/src/test/resources/json/notifications/create-db.json
new file mode 100644
index 000000000..8df0e4dcc
--- /dev/null
+++ b/webapp/src/test/resources/json/notifications/create-db.json
@@ -0,0 +1 @@
+{"version":{"version":"1.0.0","versionParts":[1]},"msgCompressionKind":"NONE","msgSplitIdx":1,"msgSplitCount":1,"msgSourceIP":"172.27.10.4","msgCreatedBy":"hive","msgCreationTime":"--ts--","message":{"type":"ENTITY_CREATE_V2","user":"hive","entities":{"referredEntities":{},"entities":[{"typeName":"hive_db","attributes":{"owner":"admin","ownerType":"USER","managedLocation":null,"qualifiedName":"--name--@--clName--","clusterName":"--clName--","name":"--name--","location":"some_location","p
 [...]
\ No newline at end of file
diff --git a/webapp/src/test/resources/json/notifications/delete-db.json 
b/webapp/src/test/resources/json/notifications/delete-db.json
new file mode 100644
index 000000000..26e82e917
--- /dev/null
+++ b/webapp/src/test/resources/json/notifications/delete-db.json
@@ -0,0 +1 @@
+{"version":{"version":"1.0.0","versionParts":[1]},"msgCompressionKind":"NONE","msgSplitIdx":1,"msgSplitCount":1,"msgSourceIP":"172.27.72.140","msgCreatedBy":"hive","msgCreationTime":"--ts--","spooled":false,"message":{"type":"ENTITY_DELETE_V2","user":"hive","entities":[{"typeName":"hive_db","uniqueAttributes":{"qualifiedName":"--name--@--clName--"}}]}}
\ No newline at end of file

Reply via email to