Repository: incubator-atlas
Updated Branches:
  refs/heads/master 86dd72aff -> a2e7738aa


ATLAS-901 Log messages that cannot be sent to Kafka to a specific log 
configuration (yhemanth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/a2e7738a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/a2e7738a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/a2e7738a

Branch: refs/heads/master
Commit: a2e7738aa25bba20bafa5f42ee1d628807a26b52
Parents: 86dd72a
Author: Hemanth Yamijala <[email protected]>
Authored: Fri Jun 17 14:58:13 2016 +0530
Committer: Hemanth Yamijala <[email protected]>
Committed: Fri Jun 17 14:58:33 2016 +0530

----------------------------------------------------------------------
 docs/src/site/twiki/Configuration.twiki         | 10 ++-
 .../java/org/apache/atlas/hook/AtlasHook.java   | 36 +++++++-
 .../apache/atlas/hook/FailedMessagesLogger.java | 95 ++++++++++++++++++++
 .../apache/atlas/kafka/KafkaNotification.java   | 46 ++++++++--
 .../notification/NotificationException.java     | 13 +++
 .../org/apache/atlas/hook/AtlasHookTest.java    | 91 +++++++++++++++++--
 .../atlas/kafka/KafkaNotificationTest.java      | 85 ++++++++++++++++++
 release-log.txt                                 |  1 +
 8 files changed, 362 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a2e7738a/docs/src/site/twiki/Configuration.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/Configuration.twiki 
b/docs/src/site/twiki/Configuration.twiki
index 0e122fe..3ad0fbe 100644
--- a/docs/src/site/twiki/Configuration.twiki
+++ b/docs/src/site/twiki/Configuration.twiki
@@ -168,9 +168,17 @@ atlas.notification.replicas=1
 atlas.notification.kafka.service.principal=kafka/[email protected]
 # Set this to the location of the keytab file for Kafka
 
#atlas.notification.kafka.keytab.location=/etc/security/keytabs/kafka.service.keytab
-
 </verbatim>
 
+These configuration parameters are useful for saving messages in case there 
are issues in reaching Kafka for
+sending messages.
+
+<verbatim>
+# Whether to save messages that failed to be sent to Kafka, default is true
+atlas.notification.log.failed.messages=true
+# If saving messages is enabled, the file name to save them to. This file will 
be created under the log directory of the hook's host component - like 
HiveServer2
+atlas.notification.failed.messages.filename=atlas_hook_failed_messages.log
+</verbatim>
 
 ---++ Client Configs
 <verbatim>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a2e7738a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java 
b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
index 71029b0..2ca8d85 100644
--- a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
+++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
@@ -18,9 +18,11 @@
 
 package org.apache.atlas.hook;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.notification.NotificationException;
 import org.apache.atlas.notification.NotificationInterface;
 import org.apache.atlas.notification.NotificationModule;
 import org.apache.atlas.notification.hook.HookNotification;
@@ -50,6 +52,15 @@ public abstract class AtlasHook {
 
     protected static NotificationInterface notifInterface;
 
+    private static boolean logFailedMessages;
+    private static FailedMessagesLogger failedMessagesLogger;
+
+    public static final String ATLAS_NOTIFICATION_FAILED_MESSAGES_FILENAME_KEY 
=
+            "atlas.notification.failed.messages.filename";
+    public static final String ATLAS_HOOK_FAILED_MESSAGES_LOG_DEFAULT_NAME = 
"atlas_hook_failed_messages.log";
+    public static final String 
ATLAS_NOTIFICATION_LOG_FAILED_MESSAGES_ENABLED_KEY =
+            "atlas.notification.log.failed.messages";
+
     static {
         try {
             atlasProperties = ApplicationProperties.get();
@@ -57,6 +68,14 @@ public abstract class AtlasHook {
             LOG.info("Failed to load application properties", e);
         }
 
+        String failedMessageFile = 
atlasProperties.getString(ATLAS_NOTIFICATION_FAILED_MESSAGES_FILENAME_KEY,
+                ATLAS_HOOK_FAILED_MESSAGES_LOG_DEFAULT_NAME);
+        logFailedMessages = 
atlasProperties.getBoolean(ATLAS_NOTIFICATION_LOG_FAILED_MESSAGES_ENABLED_KEY, 
true);
+        if (logFailedMessages) {
+            failedMessagesLogger = new FailedMessagesLogger(failedMessageFile);
+            failedMessagesLogger.init();
+        }
+
         Injector injector = Guice.createInjector(new NotificationModule());
         notifInterface = injector.getInstance(NotificationInterface.class);
 
@@ -89,18 +108,31 @@ public abstract class AtlasHook {
      * @param maxRetries maximum number of retries while sending message to 
messaging system
      */
     public static void 
notifyEntities(List<HookNotification.HookNotificationMessage> messages, int 
maxRetries) {
+        notifyEntitiesInternal(messages, maxRetries, notifInterface, 
logFailedMessages, failedMessagesLogger);
+    }
+
+    @VisibleForTesting
+    static void 
notifyEntitiesInternal(List<HookNotification.HookNotificationMessage> messages, 
int maxRetries,
+                                       NotificationInterface 
notificationInterface,
+                                       boolean shouldLogFailedMessages, 
FailedMessagesLogger logger) {
         final String message = messages.toString();
 
         int numRetries = 0;
         while (true) {
             try {
-                
notifInterface.send(NotificationInterface.NotificationType.HOOK, messages);
+                
notificationInterface.send(NotificationInterface.NotificationType.HOOK, 
messages);
                 return;
-            } catch(Exception e) {
+            } catch (Exception e) {
                 numRetries++;
                 if (numRetries < maxRetries) {
                     LOG.debug("Failed to notify atlas for entity {}. 
Retrying", message, e);
                 } else {
+                    if (shouldLogFailedMessages && e instanceof 
NotificationException) {
+                        List<String> failedMessages = ((NotificationException) 
e).getFailedMessages();
+                        for (String msg : failedMessages) {
+                            logger.log(msg);
+                        }
+                    }
                     LOG.error("Failed to notify atlas for entity {} after {} 
retries. Quitting",
                             message, maxRetries, e);
                     return;

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a2e7738a/notification/src/main/java/org/apache/atlas/hook/FailedMessagesLogger.java
----------------------------------------------------------------------
diff --git 
a/notification/src/main/java/org/apache/atlas/hook/FailedMessagesLogger.java 
b/notification/src/main/java/org/apache/atlas/hook/FailedMessagesLogger.java
new file mode 100644
index 0000000..0b552d3
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/hook/FailedMessagesLogger.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.hook;
+
+
+import org.apache.log4j.Appender;
+import org.apache.log4j.DailyRollingFileAppender;
+import org.apache.log4j.FileAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Enumeration;
+
+/**
+ * A logger wrapper that can be used to write messages that failed to be sent 
to a log file.
+ */
+public class FailedMessagesLogger {
+
+    public static final String PATTERN_SPEC_TIMESTAMP_MESSAGE_NEWLINE = 
"%d{ISO8601} %m%n";
+    public static final String DATE_PATTERN = ".yyyy-MM-dd";
+
+    private final Logger logger = 
Logger.getLogger("org.apache.atlas.hook.FailedMessagesLogger");
+    private String failedMessageFile;
+
+    public FailedMessagesLogger(String failedMessageFile) {
+        this.failedMessageFile = failedMessageFile;
+    }
+
+    void init() {
+        String rootLoggerDirectory = getRootLoggerDirectory();
+        if (rootLoggerDirectory == null) {
+            return;
+        }
+        String absolutePath = new File(rootLoggerDirectory, 
failedMessageFile).getAbsolutePath();
+        try {
+            DailyRollingFileAppender failedLogFilesAppender = new 
DailyRollingFileAppender(
+                    new PatternLayout(PATTERN_SPEC_TIMESTAMP_MESSAGE_NEWLINE), 
absolutePath, DATE_PATTERN);
+            logger.addAppender(failedLogFilesAppender);
+            logger.setLevel(Level.ERROR);
+            logger.setAdditivity(false);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * Get the root logger file location under which the failed log messages 
will be written.
+     *
+     * Since this class is used in Hooks which run within JVMs of other 
components like Hive,
+     * we want to write the failed messages file under the same location as 
where logs from
+     * the host component are saved. This method attempts to get such a 
location from the
+     * root logger's appenders. It will work only if at least one of the 
appenders is a {@link FileAppender}
+     *
+     * @return directory under which host component's logs are stored.
+     */
+    private String getRootLoggerDirectory() {
+        String rootLoggerDirectory = null;
+        org.apache.log4j.Logger rootLogger = 
org.apache.log4j.Logger.getRootLogger();
+
+        Enumeration allAppenders = rootLogger.getAllAppenders();
+        while (allAppenders.hasMoreElements()) {
+            Appender appender = (Appender) allAppenders.nextElement();
+            if (appender instanceof FileAppender) {
+                FileAppender fileAppender = (FileAppender) appender;
+                String rootLoggerFile = fileAppender.getFile();
+                rootLoggerDirectory = new File(rootLoggerFile).getParent();
+                break;
+            }
+        }
+        return rootLoggerDirectory;
+    }
+
+    void log(String message) {
+        logger.error(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a2e7738a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
----------------------------------------------------------------------
diff --git 
a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java 
b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 1ee62d2..806f2b4 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -37,6 +37,7 @@ import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationConverter;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
@@ -90,6 +91,10 @@ public class KafkaNotification extends AbstractNotification 
implements Service {
         }
     };
 
+    @VisibleForTesting
+    String getTopicName(NotificationType notificationType) {
+        return TOPIC_MAP.get(notificationType);
+    }
 
     // ----- Constructors ----------------------------------------------------
 
@@ -214,24 +219,36 @@ public class KafkaNotification extends 
AbstractNotification implements Service {
         if (producer == null) {
             createProducer();
         }
+        sendInternalToProducer(producer, type, messages);
+    }
 
+    @VisibleForTesting
+    void sendInternalToProducer(Producer p, NotificationType type, String[] 
messages) throws NotificationException {
         String topic = TOPIC_MAP.get(type);
-        List<Future<RecordMetadata>> futures = new ArrayList<>();
+        List<MessageContext> messageContexts = new ArrayList<>();
         for (String message : messages) {
             ProducerRecord record = new ProducerRecord(topic, message);
             LOG.debug("Sending message for topic {}: {}", topic, message);
-            futures.add(producer.send(record));
+            Future future = p.send(record);
+            messageContexts.add(new MessageContext(future, message));
         }
 
-        for (Future<RecordMetadata> future : futures) {
+        List<String> failedMessages = new ArrayList<>();
+        Exception lastFailureException = null;
+        for (MessageContext context : messageContexts) {
             try {
-                RecordMetadata response = future.get();
+                RecordMetadata response = context.getFuture().get();
                 LOG.debug("Sent message for topic - {}, partition - {}, offset 
- {}", response.topic(),
                     response.partition(), response.offset());
             } catch (Exception e) {
-                throw new NotificationException(e);
+                LOG.warn("Could not send message - {}", context.getMessage(), 
e);
+                lastFailureException = e;
+                failedMessages.add(context.getMessage());
             }
         }
+        if (lastFailureException != null) {
+            throw new NotificationException(lastFailureException, 
failedMessages);
+        }
     }
 
     // ----- helper methods --------------------------------------------------
@@ -359,4 +376,23 @@ public class KafkaNotification extends 
AbstractNotification implements Service {
             }
         }
     }
+
+    private class MessageContext {
+
+        private final Future<RecordMetadata> future;
+        private final String message;
+
+        public MessageContext(Future<RecordMetadata> future, String message) {
+            this.future = future;
+            this.message = message;
+        }
+
+        public Future<RecordMetadata> getFuture() {
+            return future;
+        }
+
+        public String getMessage() {
+            return message;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a2e7738a/notification/src/main/java/org/apache/atlas/notification/NotificationException.java
----------------------------------------------------------------------
diff --git 
a/notification/src/main/java/org/apache/atlas/notification/NotificationException.java
 
b/notification/src/main/java/org/apache/atlas/notification/NotificationException.java
index d9d89df..2dd9c9f 100644
--- 
a/notification/src/main/java/org/apache/atlas/notification/NotificationException.java
+++ 
b/notification/src/main/java/org/apache/atlas/notification/NotificationException.java
@@ -19,11 +19,24 @@ package org.apache.atlas.notification;
 
 import org.apache.atlas.AtlasException;
 
+import java.util.List;
+
 /**
  * Exception from notification.
  */
 public class NotificationException extends AtlasException {
+    private List<String> failedMessages;
+
     public NotificationException(Exception e) {
         super(e);
     }
+
+    public NotificationException(Exception e, List<String> failedMessages) {
+        super(e);
+        this.failedMessages = failedMessages;
+    }
+
+    public List<String> getFailedMessages() {
+        return failedMessages;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a2e7738a/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java
----------------------------------------------------------------------
diff --git 
a/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java 
b/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java
index 16cb0f0..9854bcc 100644
--- a/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java
+++ b/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java
@@ -21,24 +21,101 @@ package org.apache.atlas.hook;
 import org.apache.atlas.notification.NotificationException;
 import org.apache.atlas.notification.NotificationInterface;
 import org.apache.atlas.notification.hook.HookNotification;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
 
 
 public class AtlasHookTest {
+
+    @Mock
+    private NotificationInterface notificationInterface;
+
+    @Mock
+    private FailedMessagesLogger failedMessagesLogger;
+
+    @BeforeMethod
+    public void setup() {
+        MockitoAnnotations.initMocks(this);
+    }
+
+    @Test (timeOut = 10000)
+    public void testNotifyEntitiesDoesNotHangOnException() throws Exception {
+        List<HookNotification.HookNotificationMessage> 
hookNotificationMessages = new ArrayList<>();
+        doThrow(new NotificationException(new 
Exception())).when(notificationInterface)
+                .send(NotificationInterface.NotificationType.HOOK, 
hookNotificationMessages);
+        AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 0, 
notificationInterface, false,
+                failedMessagesLogger);
+        // if we've reached here, the method finished OK.
+    }
+
+    @Test
+    public void testNotifyEntitiesRetriesOnException() throws 
NotificationException {
+        List<HookNotification.HookNotificationMessage> 
hookNotificationMessages = new ArrayList<>();
+        doThrow(new NotificationException(new 
Exception())).when(notificationInterface)
+                .send(NotificationInterface.NotificationType.HOOK, 
hookNotificationMessages);
+        AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, 
notificationInterface, false,
+                failedMessagesLogger);
+
+        verify(notificationInterface, times(2)).
+                send(NotificationInterface.NotificationType.HOOK, 
hookNotificationMessages);
+    }
+
+    @Test
+    public void testFailedMessageIsLoggedIfRequired() throws 
NotificationException {
+        List<HookNotification.HookNotificationMessage> 
hookNotificationMessages = new ArrayList<>();
+        doThrow(new NotificationException(new Exception(), Arrays.asList("test 
message")))
+                .when(notificationInterface)
+                .send(NotificationInterface.NotificationType.HOOK, 
hookNotificationMessages);
+        AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, 
notificationInterface, true,
+                failedMessagesLogger);
+
+        verify(failedMessagesLogger, times(1)).log("test message");
+    }
+
     @Test
-    public void testnotifyEntities() throws Exception{
+    public void testFailedMessageIsNotLoggedIfNotRequired() throws 
NotificationException {
         List<HookNotification.HookNotificationMessage> 
hookNotificationMessages = new ArrayList<>();
-        NotificationInterface notifInterface = 
mock(NotificationInterface.class);
-        doThrow(new NotificationException(new 
Exception())).when(notifInterface)
+        doThrow(new NotificationException(new Exception(), Arrays.asList("test 
message")))
+                .when(notificationInterface)
                 .send(NotificationInterface.NotificationType.HOOK, 
hookNotificationMessages);
-        AtlasHook.notifInterface = notifInterface;
-        AtlasHook.notifyEntities(hookNotificationMessages, 2);
-        System.out.println("AtlasHook.notifyEntities() returns successfully");
+        AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, 
notificationInterface, false,
+                failedMessagesLogger);
+
+        verifyZeroInteractions(failedMessagesLogger);
+    }
+
+    @Test
+    public void testAllFailedMessagesAreLogged() throws NotificationException {
+        List<HookNotification.HookNotificationMessage> 
hookNotificationMessages = new ArrayList<>();
+        doThrow(new NotificationException(new Exception(), Arrays.asList("test 
message1", "test message2")))
+                .when(notificationInterface)
+                .send(NotificationInterface.NotificationType.HOOK, 
hookNotificationMessages);
+        AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, 
notificationInterface, true,
+                failedMessagesLogger);
+
+        verify(failedMessagesLogger, times(1)).log("test message1");
+        verify(failedMessagesLogger, times(1)).log("test message2");
+    }
+
+    @Test
+    public void testFailedMessageIsNotLoggedIfNotANotificationException() 
throws Exception {
+        List<HookNotification.HookNotificationMessage> 
hookNotificationMessages = new ArrayList<>();
+        doThrow(new RuntimeException("test 
message")).when(notificationInterface)
+                .send(NotificationInterface.NotificationType.HOOK, 
hookNotificationMessages);
+        AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, 
notificationInterface, true,
+                failedMessagesLogger);
+
+        verifyZeroInteractions(failedMessagesLogger);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a2e7738a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
----------------------------------------------------------------------
diff --git 
a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java 
b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
index 219bd70..2a49634 100644
--- 
a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
+++ 
b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
@@ -22,7 +22,12 @@ import kafka.javaapi.consumer.ConsumerConnector;
 import kafka.serializer.StringDecoder;
 import org.apache.atlas.notification.MessageDeserializer;
 import org.apache.atlas.notification.NotificationConsumer;
+import org.apache.atlas.notification.NotificationException;
 import org.apache.atlas.notification.NotificationInterface;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
 import org.testng.annotations.Test;
 
 import java.util.ArrayList;
@@ -30,6 +35,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
@@ -39,6 +46,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
 public class KafkaNotificationTest {
 
@@ -77,6 +85,83 @@ public class KafkaNotificationTest {
         assertTrue(consumers.contains(consumer2));
     }
 
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldSendMessagesSuccessfully() throws NotificationException,
+            ExecutionException, InterruptedException {
+        Properties configProperties = mock(Properties.class);
+        KafkaNotification kafkaNotification = new 
KafkaNotification(configProperties);
+
+        Producer producer = mock(Producer.class);
+        String topicName = 
kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK);
+        String message = "This is a test message";
+        Future returnValue = mock(Future.class);
+        when(returnValue.get()).thenReturn(new RecordMetadata(new 
TopicPartition(topicName, 0), 0, 0));
+        ProducerRecord expectedRecord = new ProducerRecord(topicName, message);
+        when(producer.send(expectedRecord)).thenReturn(returnValue);
+
+        kafkaNotification.sendInternalToProducer(producer,
+                NotificationInterface.NotificationType.HOOK, new 
String[]{message});
+
+        verify(producer).send(expectedRecord);
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldThrowExceptionIfProducerFails() throws 
NotificationException,
+            ExecutionException, InterruptedException {
+        Properties configProperties = mock(Properties.class);
+        KafkaNotification kafkaNotification = new 
KafkaNotification(configProperties);
+
+        Producer producer = mock(Producer.class);
+        String topicName = 
kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK);
+        String message = "This is a test message";
+        Future returnValue = mock(Future.class);
+        when(returnValue.get()).thenThrow(new RuntimeException("Simulating 
exception"));
+        ProducerRecord expectedRecord = new ProducerRecord(topicName, message);
+        when(producer.send(expectedRecord)).thenReturn(returnValue);
+
+        try {
+            kafkaNotification.sendInternalToProducer(producer,
+                NotificationInterface.NotificationType.HOOK, new 
String[]{message});
+            fail("Should have thrown NotificationException");
+        } catch (NotificationException e) {
+            assertEquals(e.getFailedMessages().size(), 1);
+            assertEquals(e.getFailedMessages().get(0), "This is a test 
message");
+        }
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldCollectAllFailedMessagesIfProducerFails() throws 
NotificationException,
+            ExecutionException, InterruptedException {
+        Properties configProperties = mock(Properties.class);
+        KafkaNotification kafkaNotification = new 
KafkaNotification(configProperties);
+
+        Producer producer = mock(Producer.class);
+        String topicName = 
kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK);
+        String message1 = "This is a test message1";
+        String message2 = "This is a test message2";
+        Future returnValue1 = mock(Future.class);
+        when(returnValue1.get()).thenThrow(new RuntimeException("Simulating 
exception"));
+        Future returnValue2 = mock(Future.class);
+        when(returnValue2.get()).thenThrow(new RuntimeException("Simulating 
exception"));
+        ProducerRecord expectedRecord1 = new ProducerRecord(topicName, 
message1);
+        when(producer.send(expectedRecord1)).thenReturn(returnValue1);
+        ProducerRecord expectedRecord2 = new ProducerRecord(topicName, 
message2);
+        when(producer.send(expectedRecord2)).thenReturn(returnValue1);
+
+        try {
+            kafkaNotification.sendInternalToProducer(producer,
+                    NotificationInterface.NotificationType.HOOK, new 
String[]{message1, message2});
+            fail("Should have thrown NotificationException");
+        } catch (NotificationException e) {
+            assertEquals(e.getFailedMessages().size(), 2);
+            assertEquals(e.getFailedMessages().get(0), "This is a test 
message1");
+            assertEquals(e.getFailedMessages().get(1), "This is a test 
message2");
+        }
+    }
+
     class TestKafkaNotification extends KafkaNotification {
 
         private final ConsumerConnector consumerConnector;

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a2e7738a/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index b243037..413493f 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -23,6 +23,7 @@ ATLAS-409 Atlas will not import avro tables with schema read 
from a file (dosset
 ATLAS-379 Create sqoop and falcon metadata addons 
(venkatnrangan,bvellanki,sowmyaramesh via shwethags)
 
 ALL CHANGES:
+ATLAS-901 Log messages that cannot be sent to Kafka to a specific log 
configuration (yhemanth)
 ATLAS-911 Get entity by unique attribute doesn't enforce type (shwethags)
 ATLAS-899 Fix Hive Hook documentation (sumasai via yhemanth)
 ATLAS-890 Log received messages in case of error (sumasai via yhemanth)

Reply via email to