Hello mooli tayer,

I'd like you to do a code review.  Please visit

    http://gerrit.ovirt.org/24706

to review the following change.

Change subject: tools: notifier: implement and use transport.idle().
......................................................................

tools: notifier: implement and use transport.idle().

As part of transitioning to more then one Transport,
It is no longer possible to let a transport sleep
and retry for as long as it wants. that might
starve other transports especially in a short
notification interval.

Retrying transports (those that can detect sending failures)
Should implement a queue and attempt to send it on idle.

This patch also fixes a bug where smtp loging is done
only before the first send attempt.

Change-Id: I8c4bafb542d28cb584e0751446d3e327f93e8112
Related-To: https://bugzilla.redhat.com/show_bug.cgi?id=1051492
Signed-off-by: Mooli Tayer <[email protected]>
Signed-off-by: Alon Bar-Lev <[email protected]>
---
M 
backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/EngineMonitorService.java
M 
backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/NotificationService.java
M 
backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/Notifier.java
M 
backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/transport/Transport.java
M 
backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/transport/smtp/Smtp.java
M 
backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/utils/NotificationProperties.java
A 
backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/utils/ShutdownHook.java
M packaging/services/ovirt-engine-notifier/ovirt-engine-notifier.conf.in
8 files changed, 209 insertions(+), 110 deletions(-)


  git pull ssh://gerrit.ovirt.org:29418/ovirt-engine refs/changes/06/24706/1

diff --git 
a/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/EngineMonitorService.java
 
b/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/EngineMonitorService.java
index 1525f3a..2bbcf71 100644
--- 
a/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/EngineMonitorService.java
+++ 
b/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/EngineMonitorService.java
@@ -14,6 +14,8 @@
 import java.util.Date;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import javax.net.ssl.HostnameVerifier;
@@ -29,6 +31,7 @@
 import org.ovirt.engine.core.common.AuditLogSeverity;
 import org.ovirt.engine.core.common.AuditLogType;
 import org.ovirt.engine.core.notifier.utils.NotificationProperties;
+import org.ovirt.engine.core.notifier.utils.ShutdownHook;
 import org.ovirt.engine.core.tools.common.db.StandaloneDataSource;
 import org.ovirt.engine.core.utils.EngineLocalConfig;
 import org.ovirt.engine.core.utils.crypt.EngineEncryptionUtils;
@@ -202,11 +205,30 @@
         }
     }
 
+    @Override
+    public void run() {
+        ShutdownHook shutdownHook = ShutdownHook.getInstance();
+        ScheduledExecutorService exec = 
Executors.newSingleThreadScheduledExecutor();
+        shutdownHook.addScheduledExecutorService(exec);
+        shutdownHook.addServiceHandler(
+            exec.scheduleWithFixedDelay(
+                new Runnable() {
+                    @Override
+                    public void run() {
+                        mainLogic();
+                    }
+                },
+                1,
+                
prop.getLong(NotificationProperties.ENGINE_INTERVAL_IN_SECONDS),
+                TimeUnit.SECONDS
+            )
+        );
+    }
+
     /**
      * The service monitor the status of the JBoss server using its Health 
servlet
      */
-    @Override
-    public void run() {
+    private void mainLogic() {
         try {
             monitorEngineServerStatus();
         } catch (Throwable e) {
diff --git 
a/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/NotificationService.java
 
b/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/NotificationService.java
index 53ce1e4..d5f582d 100644
--- 
a/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/NotificationService.java
+++ 
b/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/NotificationService.java
@@ -3,6 +3,9 @@
 import java.sql.SQLException;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.log4j.Logger;
@@ -12,6 +15,7 @@
 import org.ovirt.engine.core.notifier.filter.FirstMatchSimpleFilter;
 import org.ovirt.engine.core.notifier.transport.Transport;
 import org.ovirt.engine.core.notifier.utils.NotificationProperties;
+import org.ovirt.engine.core.notifier.utils.ShutdownHook;
 
 /**
  * Responsible for an execution of the service for the current events in the 
system which should be notified to the
@@ -40,7 +44,6 @@
         this.eventsManager = new EventsManager();
         firstMatchSimpleFilter = new FirstMatchSimpleFilter();
         configurationFilters = 
FirstMatchSimpleFilter.parse(prop.getProperty(FILTER));
-        markOldEventsAsProcessed();
     }
 
     private void markOldEventsAsProcessed() {
@@ -59,11 +62,44 @@
         return !transports.isEmpty();
     }
 
+    @Override
+    public void run() {
+        markOldEventsAsProcessed();
+        ShutdownHook shutdownHook = ShutdownHook.getInstance();
+        ScheduledExecutorService exec = 
Executors.newSingleThreadScheduledExecutor();
+        shutdownHook.addScheduledExecutorService(exec);
+        shutdownHook.addServiceHandler(
+            exec.scheduleWithFixedDelay(
+                new Runnable() {
+                    @Override
+                    public void run() {
+                        mainLogic();
+                    }
+                },
+                1,
+                prop.getLong(NotificationProperties.INTERVAL_IN_SECONDS),
+                TimeUnit.SECONDS
+            )
+        );
+        shutdownHook.addServiceHandler(
+            exec.scheduleWithFixedDelay(
+                new Runnable() {
+                    @Override
+                    public void run() {
+                        idle();
+                    }
+                },
+                1,
+                prop.getLong(NotificationProperties.IDLE_INTERVAL),
+                TimeUnit.SECONDS
+            )
+        );
+    }
+
     /**
      * Executes event notification to subscribers
      */
-    @Override
-    public void run() {
+    private void mainLogic() {
         try {
             try {
                 log.debug("Start event notification service iteration");
@@ -108,6 +144,14 @@
         }
     }
 
+    private void idle() {
+        log.debug("Begin idle iteration");
+        for (Transport transport : transports) {
+            transport.idle();
+        }
+        log.debug("Finished idle iteration");
+    }
+
     private void deleteObsoleteHistoryData() throws SQLException {
         
eventsManager.deleteObsoleteHistoryData(prop.getInteger(NotificationProperties.DAYS_TO_KEEP_HISTORY));
     }
diff --git 
a/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/Notifier.java
 
b/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/Notifier.java
index 48d3f8c..3b8e4f7 100644
--- 
a/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/Notifier.java
+++ 
b/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/Notifier.java
@@ -2,12 +2,6 @@
 
 import java.net.MalformedURLException;
 import java.net.URL;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
 
 import javax.xml.parsers.FactoryConfigurationError;
 
@@ -23,9 +17,6 @@
  */
 public class Notifier {
     private static final Logger log = Logger.getLogger(Notifier.class);
-
-    private static ScheduledExecutorService notifyScheduler = 
Executors.newSingleThreadScheduledExecutor();
-    private static ScheduledExecutorService monitorScheduler = 
Executors.newSingleThreadScheduledExecutor();
 
     /**
      * Command line argument, that tells Notifier to validate properties only 
(it exits after validation)
@@ -58,11 +49,13 @@
         initLogging();
 
         NotificationService notificationService = null;
+        EngineMonitorService engineMonitorService = null;
 
         try {
             prop = NotificationProperties.getInstance();
             prop.validate();
             notificationService = new NotificationService(prop);
+            engineMonitorService = new EngineMonitorService(prop);
             notificationService.registerTransport(new Smtp(prop));
             if (!notificationService.hasTransports()) {
                 throw new RuntimeException("No transport is enabled, nothing 
to do");
@@ -79,62 +72,13 @@
             System.exit(0);
         }
 
-        NotifierSignalHandler handler = new NotifierSignalHandler();
-        handler.addScheduledExecutorService(notifyScheduler);
-        handler.addScheduledExecutorService(monitorScheduler);
-        Runtime.getRuntime().addShutdownHook(handler);
-
         try {
-            EngineMonitorService engineMonitorService = new 
EngineMonitorService(prop);
-
-            // add notification service to scheduler with its configurable 
interval
-            
handler.addServiceHandler(notifyScheduler.scheduleWithFixedDelay(notificationService,
-                    1,
-                    prop.getLong(NotificationProperties.INTERVAL_IN_SECONDS),
-                    TimeUnit.SECONDS));
-
-            // add engine monitor service to scheduler with its configurable 
interval
-            
handler.addServiceHandler(monitorScheduler.scheduleWithFixedDelay(engineMonitorService,
-                    1,
-                    
prop.getLong(NotificationProperties.ENGINE_INTERVAL_IN_SECONDS),
-                    TimeUnit.SECONDS));
+            notificationService.run();
+            engineMonitorService.run();
         } catch (Exception e) {
             log.error("Failed to run the event notification service. ", e);
             // flag exit code to calling script after threads shut down.
             System.exit(1);
-        }
-    }
-
-    /**
-     * Class designed to handle a proper shutdown in case of an external 
signal which was registered was caught by the
-     * program.
-     */
-    public static class NotifierSignalHandler extends Thread {
-        private List<ScheduledFuture<?>> serviceHandler = new 
ArrayList<ScheduledFuture<?>>();
-        private List<ScheduledExecutorService> scheduler = new 
ArrayList<ScheduledExecutorService>();
-
-        @Override
-        public void run() {
-            log.info("Preparing for shutdown after receiving signal " );
-            if (serviceHandler.size() > 0) {
-                for (ScheduledFuture<?> scheduled : serviceHandler) {
-                    scheduled.cancel(true);
-                }
-            }
-            if (scheduler.size() > 0) {
-                for (ScheduledExecutorService executer : scheduler) {
-                    executer.shutdown();
-                }
-            }
-            log.info("Event Notification service was shutdown");
-        }
-
-        public void addScheduledExecutorService(ScheduledExecutorService 
scheduler) {
-            this.scheduler.add(scheduler);
-        }
-
-        public void addServiceHandler(ScheduledFuture<?> serviceHandler) {
-            this.serviceHandler.add(serviceHandler);
         }
     }
 }
diff --git 
a/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/transport/Transport.java
 
b/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/transport/Transport.java
index 9ce80e3..67473723 100644
--- 
a/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/transport/Transport.java
+++ 
b/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/transport/Transport.java
@@ -32,6 +32,13 @@
      */
     public abstract void dispatchEvent(AuditLogEvent event, String address);
 
+    /**
+     * Upon an idle call a transport performs background tasks if needed.
+     * A default empty implementation is provided.
+     */
+    public void idle() {
+    }
+
     @Override
     public void notifyObservers(DispatchResult data) {
         for (Observer observer : observers) {
diff --git 
a/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/transport/smtp/Smtp.java
 
b/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/transport/smtp/Smtp.java
index aef6e7d..ff1ea39 100644
--- 
a/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/transport/smtp/Smtp.java
+++ 
b/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/transport/smtp/Smtp.java
@@ -4,7 +4,10 @@
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Date;
+import java.util.Iterator;
 import java.util.Properties;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import javax.mail.Address;
 import javax.mail.Authenticator;
@@ -49,10 +52,14 @@
     private static final String MAIL_SMTP_ENCRYPTION_NONE = "none";
     private static final String MAIL_SMTP_ENCRYPTION_SSL = "ssl";
     private static final String MAIL_SMTP_ENCRYPTION_TLS = "tls";
+    private static final String MAIL_SEND_INTERVAL = "MAIL_SEND_INTERVAL";
     private static final String MAIL_RETRIES = "MAIL_RETRIES";
 
     private static final Logger log = Logger.getLogger(Smtp.class);
     private int retries;
+    private int sendIntervals;
+    private int lastSendInterval = 0;
+    private final Queue<DispatchAttempt> sendQueue = new 
LinkedBlockingQueue<>();
     private String hostName;
     private boolean isBodyHtml = false;
     private Session session = null;
@@ -78,6 +85,7 @@
         }
 
         retries = props.validateNonNegetive(MAIL_RETRIES);
+        sendIntervals = props.validateNonNegetive(MAIL_SEND_INTERVAL);
         isBodyHtml = props.getBoolean(HTML_MESSAGE_FORMAT, false);
         from = props.validateEmail(MAIL_FROM);
         replyTo = props.validateEmail(MAIL_REPLY_TO);
@@ -138,44 +146,45 @@
     public void dispatchEvent(AuditLogEvent event, String address) {
         if (StringUtils.isEmpty(address)) {
             log.error("Address is empty, cannot distribute message." + 
event.getName());
-            return;
         }
-
-        EventMessageContent message = new EventMessageContent();
-        message.prepareMessage(hostName, event, isBodyHtml);
-
-        log.info(String.format("Send email to [%s]%n subject:%n [%s]",
-                address,
-                message.getMessageSubject()));
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("body:%n [%s]",
-                    message.getMessageBody()));
+        else {
+            sendQueue.add(new DispatchAttempt(event, address));
         }
+    }
 
-        String errorMessage = null;
-        int retry = 0;
-        boolean success = false;
-        while (!success && retry < retries) {
-            try {
-                sendMail(address, message.getMessageSubject(), 
message.getMessageBody());
-                notifyObservers(DispatchResult.success(event, address, 
EventNotificationMethod.EMAIL));
-                success = true;
-            } catch (MessagingException ex) {
-                errorMessage = ex.getMessage();
-            }
+    @Override
+    public void idle() {
+        if (lastSendInterval++ >= sendIntervals) {
+            lastSendInterval = 0;
 
-            if (!success) {
+            Iterator<DispatchAttempt> iterator = sendQueue.iterator();
+            while (iterator.hasNext()) {
+                DispatchAttempt attempt = iterator.next();
                 try {
-                    Thread.sleep(30000);
-                } catch (InterruptedException e) {
-                    log.error("Failed to suspend thread for 30 seconds while 
trying to resend a mail message.", e);
+                    EventMessageContent message = new EventMessageContent();
+                    message.prepareMessage(hostName, attempt.event, 
isBodyHtml);
+
+                    log.info(String.format("Send email to [%s]%n subject:%n 
[%s]",
+                            attempt.address,
+                            message.getMessageSubject()));
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("body:%n [%s]",
+                                message.getMessageBody()));
+                    }
+                    sendMail(attempt.address, message.getMessageSubject(), 
message.getMessageBody());
+                    notifyObservers(DispatchResult.success(attempt.event, 
attempt.address, EventNotificationMethod.EMAIL));
+                    iterator.remove();
+                } catch (Exception ex) {
+                    attempt.retries++;
+                    if (attempt.retries >= retries) {
+                        notifyObservers(DispatchResult.failure(attempt.event,
+                                attempt.address,
+                                EventNotificationMethod.EMAIL,
+                                ex.getMessage()));
+                        iterator.remove();
+                    }
                 }
-                retry++;
             }
-        }
-        // Could not send after retries.
-        if (!success) {
-            notifyObservers(DispatchResult.failure(event, address, 
EventNotificationMethod.EMAIL, errorMessage));
         }
     }
 
@@ -242,5 +251,15 @@
             return new PasswordAuthentication(userName, password);
         }
     }
+
+    private class DispatchAttempt {
+         public final AuditLogEvent event;
+         public final String address;
+         public int retries = 0;
+         private DispatchAttempt(AuditLogEvent event, String address) {
+             this.event = event;
+             this.address = address;
+         }
+     }
 }
 
diff --git 
a/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/utils/NotificationProperties.java
 
b/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/utils/NotificationProperties.java
index db3b9d5..7c9d471 100644
--- 
a/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/utils/NotificationProperties.java
+++ 
b/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/utils/NotificationProperties.java
@@ -25,6 +25,12 @@
     public static final String SSL_IGNORE_HOST_VERIFICATION = 
"SSL_IGNORE_HOST_VERIFICATION";
 
     /**
+     * Idle task interval
+     * Interval in seconds to perform low priority tasks.
+     */
+    public static final String IDLE_INTERVAL = "IDLE_INTERVAL";
+
+    /**
      * Comma separated list of recipients to be informed in case the 
notification service cannot connect to the DB. can
      * be empty.
      */
@@ -102,25 +108,16 @@
                 ENGINE_TIMEOUT_IN_SECONDS,
                 INTERVAL_IN_SECONDS,
                 IS_HTTPS_PROTOCOL,
-                REPEAT_NON_RESPONSIVE_NOTIFICATION);
+                REPEAT_NON_RESPONSIVE_NOTIFICATION,
+                IDLE_INTERVAL);
 
         // validate non negative args
         for (String property : new String[] {
                 DAYS_TO_KEEP_HISTORY,
                 DAYS_TO_SEND_ON_STARTUP,
-                FAILED_QUERIES_NOTIFICATION_THRESHOLD }) {
-            final String stringVal = getProperty(property);
-            try {
-                int value = Integer.parseInt(stringVal);
-                if (value < 0) {
-                    throw new NumberFormatException();
-                }
-            } catch (NumberFormatException exception) {
-                throw new IllegalArgumentException(
-                        String.format(
-                                "'%s' must be a non negative integer.",
-                                property));
-            }
+                FAILED_QUERIES_NOTIFICATION_THRESHOLD,
+                IDLE_INTERVAL }) {
+            validateNonNegetive(property);
         }
     }
 
diff --git 
a/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/utils/ShutdownHook.java
 
b/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/utils/ShutdownHook.java
new file mode 100644
index 0000000..501c296
--- /dev/null
+++ 
b/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/utils/ShutdownHook.java
@@ -0,0 +1,57 @@
+package org.ovirt.engine.core.notifier.utils;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Class designed to handle a proper shutdown in case of an external signal 
which was registered was caught by the
+ * program.
+ */
+public class ShutdownHook extends Thread {
+
+    private static final Logger log = Logger.getLogger(ShutdownHook.class);
+
+    private List<ScheduledExecutorService> schedulers = new LinkedList<>();
+    private List<ScheduledFuture<?>> serviceHandlers = new LinkedList<>();
+
+    private static volatile ShutdownHook instance;
+
+    public static ShutdownHook getInstance() {
+        if (instance == null) {
+            synchronized(ShutdownHook.class) {
+                if (instance == null) {
+                    instance = new ShutdownHook();
+                }
+            }
+        }
+        return instance;
+    }
+
+    private ShutdownHook() {
+        Runtime.getRuntime().addShutdownHook(this);
+    }
+
+    @Override
+    public void run() {
+        log.info("Preparing for shutdown after receiving signal " );
+        for (ScheduledFuture<?> scheduled : serviceHandlers) {
+            scheduled.cancel(true);
+        }
+        for (ScheduledExecutorService executer : schedulers) {
+            executer.shutdown();
+        }
+        log.info("Event Notification service was shutdown");
+    }
+
+    public void addScheduledExecutorService(ScheduledExecutorService 
scheduler) {
+        schedulers.add(scheduler);
+    }
+
+    public void addServiceHandler(ScheduledFuture<?> serviceHandler) {
+        serviceHandlers.add(serviceHandler);
+    }
+}
diff --git 
a/packaging/services/ovirt-engine-notifier/ovirt-engine-notifier.conf.in 
b/packaging/services/ovirt-engine-notifier/ovirt-engine-notifier.conf.in
index 735946d..9f4ccb8 100644
--- a/packaging/services/ovirt-engine-notifier/ovirt-engine-notifier.conf.in
+++ b/packaging/services/ovirt-engine-notifier/ovirt-engine-notifier.conf.in
@@ -50,6 +50,12 @@
 # Interval (in seconds) between iterations of dispatching messages to 
subscribers. Default is 120 seconds.
 INTERVAL_IN_SECONDS=120
 
+#
+# Idle task interval
+# Interval in seconds to perform low priority tasks.
+#
+IDLE_INTERVAL=30
+
 # Amount of days to keep dispatched events on history table. If 0, events 
remain on history table for ever.
 DAYS_TO_KEEP_HISTORY=0
 
@@ -119,6 +125,9 @@
 # Specifies 'reply-to' address on sent mail in RFC822 format.
 MAIL_REPLY_TO=
 
+# Interval to send smtp messages per # of IDLE_INTERVAL
+MAIL_SEND_INTERVAL=1
+
 # Amount of times to attempt sending an email before failing.
 MAIL_RETRIES=4
 


-- 
To view, visit http://gerrit.ovirt.org/24706
To unsubscribe, visit http://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I8c4bafb542d28cb584e0751446d3e327f93e8112
Gerrit-PatchSet: 1
Gerrit-Project: ovirt-engine
Gerrit-Branch: ovirt-engine-3.4
Gerrit-Owner: Alon Bar-Lev <[email protected]>
Gerrit-Reviewer: mooli tayer <[email protected]>
_______________________________________________
Engine-patches mailing list
[email protected]
http://lists.ovirt.org/mailman/listinfo/engine-patches

Reply via email to