Hello mooli tayer,
I'd like you to do a code review. Please visit
http://gerrit.ovirt.org/24706
to review the following change.
Change subject: tools: notifier: implement and use transport.idle().
......................................................................
tools: notifier: implement and use transport.idle().
As part of transitioning to more then one Transport,
It is no longer possible to let a transport sleep
and retry for as long as it wants. that might
starve other transports especially in a short
notification interval.
Retrying transports (those that can detect sending failures)
Should implement a queue and attempt to send it on idle.
This patch also fixes a bug where smtp loging is done
only before the first send attempt.
Change-Id: I8c4bafb542d28cb584e0751446d3e327f93e8112
Related-To: https://bugzilla.redhat.com/show_bug.cgi?id=1051492
Signed-off-by: Mooli Tayer <[email protected]>
Signed-off-by: Alon Bar-Lev <[email protected]>
---
M
backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/EngineMonitorService.java
M
backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/NotificationService.java
M
backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/Notifier.java
M
backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/transport/Transport.java
M
backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/transport/smtp/Smtp.java
M
backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/utils/NotificationProperties.java
A
backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/utils/ShutdownHook.java
M packaging/services/ovirt-engine-notifier/ovirt-engine-notifier.conf.in
8 files changed, 209 insertions(+), 110 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/ovirt-engine refs/changes/06/24706/1
diff --git
a/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/EngineMonitorService.java
b/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/EngineMonitorService.java
index 1525f3a..2bbcf71 100644
---
a/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/EngineMonitorService.java
+++
b/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/EngineMonitorService.java
@@ -14,6 +14,8 @@
import java.util.Date;
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.HostnameVerifier;
@@ -29,6 +31,7 @@
import org.ovirt.engine.core.common.AuditLogSeverity;
import org.ovirt.engine.core.common.AuditLogType;
import org.ovirt.engine.core.notifier.utils.NotificationProperties;
+import org.ovirt.engine.core.notifier.utils.ShutdownHook;
import org.ovirt.engine.core.tools.common.db.StandaloneDataSource;
import org.ovirt.engine.core.utils.EngineLocalConfig;
import org.ovirt.engine.core.utils.crypt.EngineEncryptionUtils;
@@ -202,11 +205,30 @@
}
}
+ @Override
+ public void run() {
+ ShutdownHook shutdownHook = ShutdownHook.getInstance();
+ ScheduledExecutorService exec =
Executors.newSingleThreadScheduledExecutor();
+ shutdownHook.addScheduledExecutorService(exec);
+ shutdownHook.addServiceHandler(
+ exec.scheduleWithFixedDelay(
+ new Runnable() {
+ @Override
+ public void run() {
+ mainLogic();
+ }
+ },
+ 1,
+
prop.getLong(NotificationProperties.ENGINE_INTERVAL_IN_SECONDS),
+ TimeUnit.SECONDS
+ )
+ );
+ }
+
/**
* The service monitor the status of the JBoss server using its Health
servlet
*/
- @Override
- public void run() {
+ private void mainLogic() {
try {
monitorEngineServerStatus();
} catch (Throwable e) {
diff --git
a/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/NotificationService.java
b/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/NotificationService.java
index 53ce1e4..d5f582d 100644
---
a/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/NotificationService.java
+++
b/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/NotificationService.java
@@ -3,6 +3,9 @@
import java.sql.SQLException;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
@@ -12,6 +15,7 @@
import org.ovirt.engine.core.notifier.filter.FirstMatchSimpleFilter;
import org.ovirt.engine.core.notifier.transport.Transport;
import org.ovirt.engine.core.notifier.utils.NotificationProperties;
+import org.ovirt.engine.core.notifier.utils.ShutdownHook;
/**
* Responsible for an execution of the service for the current events in the
system which should be notified to the
@@ -40,7 +44,6 @@
this.eventsManager = new EventsManager();
firstMatchSimpleFilter = new FirstMatchSimpleFilter();
configurationFilters =
FirstMatchSimpleFilter.parse(prop.getProperty(FILTER));
- markOldEventsAsProcessed();
}
private void markOldEventsAsProcessed() {
@@ -59,11 +62,44 @@
return !transports.isEmpty();
}
+ @Override
+ public void run() {
+ markOldEventsAsProcessed();
+ ShutdownHook shutdownHook = ShutdownHook.getInstance();
+ ScheduledExecutorService exec =
Executors.newSingleThreadScheduledExecutor();
+ shutdownHook.addScheduledExecutorService(exec);
+ shutdownHook.addServiceHandler(
+ exec.scheduleWithFixedDelay(
+ new Runnable() {
+ @Override
+ public void run() {
+ mainLogic();
+ }
+ },
+ 1,
+ prop.getLong(NotificationProperties.INTERVAL_IN_SECONDS),
+ TimeUnit.SECONDS
+ )
+ );
+ shutdownHook.addServiceHandler(
+ exec.scheduleWithFixedDelay(
+ new Runnable() {
+ @Override
+ public void run() {
+ idle();
+ }
+ },
+ 1,
+ prop.getLong(NotificationProperties.IDLE_INTERVAL),
+ TimeUnit.SECONDS
+ )
+ );
+ }
+
/**
* Executes event notification to subscribers
*/
- @Override
- public void run() {
+ private void mainLogic() {
try {
try {
log.debug("Start event notification service iteration");
@@ -108,6 +144,14 @@
}
}
+ private void idle() {
+ log.debug("Begin idle iteration");
+ for (Transport transport : transports) {
+ transport.idle();
+ }
+ log.debug("Finished idle iteration");
+ }
+
private void deleteObsoleteHistoryData() throws SQLException {
eventsManager.deleteObsoleteHistoryData(prop.getInteger(NotificationProperties.DAYS_TO_KEEP_HISTORY));
}
diff --git
a/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/Notifier.java
b/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/Notifier.java
index 48d3f8c..3b8e4f7 100644
---
a/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/Notifier.java
+++
b/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/Notifier.java
@@ -2,12 +2,6 @@
import java.net.MalformedURLException;
import java.net.URL;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
import javax.xml.parsers.FactoryConfigurationError;
@@ -23,9 +17,6 @@
*/
public class Notifier {
private static final Logger log = Logger.getLogger(Notifier.class);
-
- private static ScheduledExecutorService notifyScheduler =
Executors.newSingleThreadScheduledExecutor();
- private static ScheduledExecutorService monitorScheduler =
Executors.newSingleThreadScheduledExecutor();
/**
* Command line argument, that tells Notifier to validate properties only
(it exits after validation)
@@ -58,11 +49,13 @@
initLogging();
NotificationService notificationService = null;
+ EngineMonitorService engineMonitorService = null;
try {
prop = NotificationProperties.getInstance();
prop.validate();
notificationService = new NotificationService(prop);
+ engineMonitorService = new EngineMonitorService(prop);
notificationService.registerTransport(new Smtp(prop));
if (!notificationService.hasTransports()) {
throw new RuntimeException("No transport is enabled, nothing
to do");
@@ -79,62 +72,13 @@
System.exit(0);
}
- NotifierSignalHandler handler = new NotifierSignalHandler();
- handler.addScheduledExecutorService(notifyScheduler);
- handler.addScheduledExecutorService(monitorScheduler);
- Runtime.getRuntime().addShutdownHook(handler);
-
try {
- EngineMonitorService engineMonitorService = new
EngineMonitorService(prop);
-
- // add notification service to scheduler with its configurable
interval
-
handler.addServiceHandler(notifyScheduler.scheduleWithFixedDelay(notificationService,
- 1,
- prop.getLong(NotificationProperties.INTERVAL_IN_SECONDS),
- TimeUnit.SECONDS));
-
- // add engine monitor service to scheduler with its configurable
interval
-
handler.addServiceHandler(monitorScheduler.scheduleWithFixedDelay(engineMonitorService,
- 1,
-
prop.getLong(NotificationProperties.ENGINE_INTERVAL_IN_SECONDS),
- TimeUnit.SECONDS));
+ notificationService.run();
+ engineMonitorService.run();
} catch (Exception e) {
log.error("Failed to run the event notification service. ", e);
// flag exit code to calling script after threads shut down.
System.exit(1);
- }
- }
-
- /**
- * Class designed to handle a proper shutdown in case of an external
signal which was registered was caught by the
- * program.
- */
- public static class NotifierSignalHandler extends Thread {
- private List<ScheduledFuture<?>> serviceHandler = new
ArrayList<ScheduledFuture<?>>();
- private List<ScheduledExecutorService> scheduler = new
ArrayList<ScheduledExecutorService>();
-
- @Override
- public void run() {
- log.info("Preparing for shutdown after receiving signal " );
- if (serviceHandler.size() > 0) {
- for (ScheduledFuture<?> scheduled : serviceHandler) {
- scheduled.cancel(true);
- }
- }
- if (scheduler.size() > 0) {
- for (ScheduledExecutorService executer : scheduler) {
- executer.shutdown();
- }
- }
- log.info("Event Notification service was shutdown");
- }
-
- public void addScheduledExecutorService(ScheduledExecutorService
scheduler) {
- this.scheduler.add(scheduler);
- }
-
- public void addServiceHandler(ScheduledFuture<?> serviceHandler) {
- this.serviceHandler.add(serviceHandler);
}
}
}
diff --git
a/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/transport/Transport.java
b/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/transport/Transport.java
index 9ce80e3..67473723 100644
---
a/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/transport/Transport.java
+++
b/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/transport/Transport.java
@@ -32,6 +32,13 @@
*/
public abstract void dispatchEvent(AuditLogEvent event, String address);
+ /**
+ * Upon an idle call a transport performs background tasks if needed.
+ * A default empty implementation is provided.
+ */
+ public void idle() {
+ }
+
@Override
public void notifyObservers(DispatchResult data) {
for (Observer observer : observers) {
diff --git
a/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/transport/smtp/Smtp.java
b/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/transport/smtp/Smtp.java
index aef6e7d..ff1ea39 100644
---
a/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/transport/smtp/Smtp.java
+++
b/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/transport/smtp/Smtp.java
@@ -4,7 +4,10 @@
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Date;
+import java.util.Iterator;
import java.util.Properties;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
import javax.mail.Address;
import javax.mail.Authenticator;
@@ -49,10 +52,14 @@
private static final String MAIL_SMTP_ENCRYPTION_NONE = "none";
private static final String MAIL_SMTP_ENCRYPTION_SSL = "ssl";
private static final String MAIL_SMTP_ENCRYPTION_TLS = "tls";
+ private static final String MAIL_SEND_INTERVAL = "MAIL_SEND_INTERVAL";
private static final String MAIL_RETRIES = "MAIL_RETRIES";
private static final Logger log = Logger.getLogger(Smtp.class);
private int retries;
+ private int sendIntervals;
+ private int lastSendInterval = 0;
+ private final Queue<DispatchAttempt> sendQueue = new
LinkedBlockingQueue<>();
private String hostName;
private boolean isBodyHtml = false;
private Session session = null;
@@ -78,6 +85,7 @@
}
retries = props.validateNonNegetive(MAIL_RETRIES);
+ sendIntervals = props.validateNonNegetive(MAIL_SEND_INTERVAL);
isBodyHtml = props.getBoolean(HTML_MESSAGE_FORMAT, false);
from = props.validateEmail(MAIL_FROM);
replyTo = props.validateEmail(MAIL_REPLY_TO);
@@ -138,44 +146,45 @@
public void dispatchEvent(AuditLogEvent event, String address) {
if (StringUtils.isEmpty(address)) {
log.error("Address is empty, cannot distribute message." +
event.getName());
- return;
}
-
- EventMessageContent message = new EventMessageContent();
- message.prepareMessage(hostName, event, isBodyHtml);
-
- log.info(String.format("Send email to [%s]%n subject:%n [%s]",
- address,
- message.getMessageSubject()));
- if (log.isDebugEnabled()) {
- log.debug(String.format("body:%n [%s]",
- message.getMessageBody()));
+ else {
+ sendQueue.add(new DispatchAttempt(event, address));
}
+ }
- String errorMessage = null;
- int retry = 0;
- boolean success = false;
- while (!success && retry < retries) {
- try {
- sendMail(address, message.getMessageSubject(),
message.getMessageBody());
- notifyObservers(DispatchResult.success(event, address,
EventNotificationMethod.EMAIL));
- success = true;
- } catch (MessagingException ex) {
- errorMessage = ex.getMessage();
- }
+ @Override
+ public void idle() {
+ if (lastSendInterval++ >= sendIntervals) {
+ lastSendInterval = 0;
- if (!success) {
+ Iterator<DispatchAttempt> iterator = sendQueue.iterator();
+ while (iterator.hasNext()) {
+ DispatchAttempt attempt = iterator.next();
try {
- Thread.sleep(30000);
- } catch (InterruptedException e) {
- log.error("Failed to suspend thread for 30 seconds while
trying to resend a mail message.", e);
+ EventMessageContent message = new EventMessageContent();
+ message.prepareMessage(hostName, attempt.event,
isBodyHtml);
+
+ log.info(String.format("Send email to [%s]%n subject:%n
[%s]",
+ attempt.address,
+ message.getMessageSubject()));
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("body:%n [%s]",
+ message.getMessageBody()));
+ }
+ sendMail(attempt.address, message.getMessageSubject(),
message.getMessageBody());
+ notifyObservers(DispatchResult.success(attempt.event,
attempt.address, EventNotificationMethod.EMAIL));
+ iterator.remove();
+ } catch (Exception ex) {
+ attempt.retries++;
+ if (attempt.retries >= retries) {
+ notifyObservers(DispatchResult.failure(attempt.event,
+ attempt.address,
+ EventNotificationMethod.EMAIL,
+ ex.getMessage()));
+ iterator.remove();
+ }
}
- retry++;
}
- }
- // Could not send after retries.
- if (!success) {
- notifyObservers(DispatchResult.failure(event, address,
EventNotificationMethod.EMAIL, errorMessage));
}
}
@@ -242,5 +251,15 @@
return new PasswordAuthentication(userName, password);
}
}
+
+ private class DispatchAttempt {
+ public final AuditLogEvent event;
+ public final String address;
+ public int retries = 0;
+ private DispatchAttempt(AuditLogEvent event, String address) {
+ this.event = event;
+ this.address = address;
+ }
+ }
}
diff --git
a/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/utils/NotificationProperties.java
b/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/utils/NotificationProperties.java
index db3b9d5..7c9d471 100644
---
a/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/utils/NotificationProperties.java
+++
b/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/utils/NotificationProperties.java
@@ -25,6 +25,12 @@
public static final String SSL_IGNORE_HOST_VERIFICATION =
"SSL_IGNORE_HOST_VERIFICATION";
/**
+ * Idle task interval
+ * Interval in seconds to perform low priority tasks.
+ */
+ public static final String IDLE_INTERVAL = "IDLE_INTERVAL";
+
+ /**
* Comma separated list of recipients to be informed in case the
notification service cannot connect to the DB. can
* be empty.
*/
@@ -102,25 +108,16 @@
ENGINE_TIMEOUT_IN_SECONDS,
INTERVAL_IN_SECONDS,
IS_HTTPS_PROTOCOL,
- REPEAT_NON_RESPONSIVE_NOTIFICATION);
+ REPEAT_NON_RESPONSIVE_NOTIFICATION,
+ IDLE_INTERVAL);
// validate non negative args
for (String property : new String[] {
DAYS_TO_KEEP_HISTORY,
DAYS_TO_SEND_ON_STARTUP,
- FAILED_QUERIES_NOTIFICATION_THRESHOLD }) {
- final String stringVal = getProperty(property);
- try {
- int value = Integer.parseInt(stringVal);
- if (value < 0) {
- throw new NumberFormatException();
- }
- } catch (NumberFormatException exception) {
- throw new IllegalArgumentException(
- String.format(
- "'%s' must be a non negative integer.",
- property));
- }
+ FAILED_QUERIES_NOTIFICATION_THRESHOLD,
+ IDLE_INTERVAL }) {
+ validateNonNegetive(property);
}
}
diff --git
a/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/utils/ShutdownHook.java
b/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/utils/ShutdownHook.java
new file mode 100644
index 0000000..501c296
--- /dev/null
+++
b/backend/manager/tools/src/main/java/org/ovirt/engine/core/notifier/utils/ShutdownHook.java
@@ -0,0 +1,57 @@
+package org.ovirt.engine.core.notifier.utils;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Class designed to handle a proper shutdown in case of an external signal
which was registered was caught by the
+ * program.
+ */
+public class ShutdownHook extends Thread {
+
+ private static final Logger log = Logger.getLogger(ShutdownHook.class);
+
+ private List<ScheduledExecutorService> schedulers = new LinkedList<>();
+ private List<ScheduledFuture<?>> serviceHandlers = new LinkedList<>();
+
+ private static volatile ShutdownHook instance;
+
+ public static ShutdownHook getInstance() {
+ if (instance == null) {
+ synchronized(ShutdownHook.class) {
+ if (instance == null) {
+ instance = new ShutdownHook();
+ }
+ }
+ }
+ return instance;
+ }
+
+ private ShutdownHook() {
+ Runtime.getRuntime().addShutdownHook(this);
+ }
+
+ @Override
+ public void run() {
+ log.info("Preparing for shutdown after receiving signal " );
+ for (ScheduledFuture<?> scheduled : serviceHandlers) {
+ scheduled.cancel(true);
+ }
+ for (ScheduledExecutorService executer : schedulers) {
+ executer.shutdown();
+ }
+ log.info("Event Notification service was shutdown");
+ }
+
+ public void addScheduledExecutorService(ScheduledExecutorService
scheduler) {
+ schedulers.add(scheduler);
+ }
+
+ public void addServiceHandler(ScheduledFuture<?> serviceHandler) {
+ serviceHandlers.add(serviceHandler);
+ }
+}
diff --git
a/packaging/services/ovirt-engine-notifier/ovirt-engine-notifier.conf.in
b/packaging/services/ovirt-engine-notifier/ovirt-engine-notifier.conf.in
index 735946d..9f4ccb8 100644
--- a/packaging/services/ovirt-engine-notifier/ovirt-engine-notifier.conf.in
+++ b/packaging/services/ovirt-engine-notifier/ovirt-engine-notifier.conf.in
@@ -50,6 +50,12 @@
# Interval (in seconds) between iterations of dispatching messages to
subscribers. Default is 120 seconds.
INTERVAL_IN_SECONDS=120
+#
+# Idle task interval
+# Interval in seconds to perform low priority tasks.
+#
+IDLE_INTERVAL=30
+
# Amount of days to keep dispatched events on history table. If 0, events
remain on history table for ever.
DAYS_TO_KEEP_HISTORY=0
@@ -119,6 +125,9 @@
# Specifies 'reply-to' address on sent mail in RFC822 format.
MAIL_REPLY_TO=
+# Interval to send smtp messages per # of IDLE_INTERVAL
+MAIL_SEND_INTERVAL=1
+
# Amount of times to attempt sending an email before failing.
MAIL_RETRIES=4
--
To view, visit http://gerrit.ovirt.org/24706
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I8c4bafb542d28cb584e0751446d3e327f93e8112
Gerrit-PatchSet: 1
Gerrit-Project: ovirt-engine
Gerrit-Branch: ovirt-engine-3.4
Gerrit-Owner: Alon Bar-Lev <[email protected]>
Gerrit-Reviewer: mooli tayer <[email protected]>
_______________________________________________
Engine-patches mailing list
[email protected]
http://lists.ovirt.org/mailman/listinfo/engine-patches