Repository: incubator-eagle Updated Branches: refs/heads/master a89275bfc -> 725e73377
[EAGLE-843] Refactor application shared service registry framework Refactor application shared service registry framework * Add `Optional<List<Service>> getSharedServices(Config envConfig)` in `ApplicationProvider` * Move `MRHistoryJobDailyReporter` registry to `MRHistoryJobApplicationProvider` from `ServerApplication` * Register `getSharedServices` from `ApplicationProvider` to `Environment` in `ServerApplication` * Add `EmbeddedMailService` https://issues.apache.org/jira/browse/EAGLE-843 Author: Hao Chen <h...@apache.org> Closes #750 from haoch/RefactorAppServicesFramework. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/725e7337 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/725e7337 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/725e7337 Branch: refs/heads/master Commit: 725e733778dd405c2056249e766d342cbed32e8a Parents: a89275b Author: Hao Chen <h...@apache.org> Authored: Mon Dec 19 11:16:55 2016 +0800 Committer: Hao Chen <h...@apache.org> Committed: Mon Dec 19 11:16:55 2016 +0800 ---------------------------------------------------------------------- .../publisher/AlertEmailPublisherTest.java | 16 +-- .../impl/ApplicationHealthCheckServiceImpl.java | 2 +- .../eagle/app/spi/ApplicationProvider.java | 17 ++- .../queue/HadoopQueueRunningAppProvider.java | 2 +- .../MRHistoryJobApplicationProvider.java | 16 ++- .../mr/history/MRHistoryJobDailyReporter.java | 15 +- .../history/MRHistoryJobDailyReporterTest.java | 6 +- .../history/SparkHistoryJobAppProvider.java | 2 +- eagle-server/pom.xml | 6 + .../apache/eagle/server/ServerApplication.java | 39 +++-- .../eagle/server/task/ApplicationTask.java | 42 ------ .../eagle/server/task/ManagedService.java | 43 ++++++ .../eagle/server/EmbeddedMailService.java | 141 +++++++++++++++++++ .../org/apache/eagle/server/ServerDebug.java | 7 +- .../topology/TopologyCheckAppProvider.java | 2 +- 15 files changed, 278 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/725e7337/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertEmailPublisherTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertEmailPublisherTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertEmailPublisherTest.java index 1f131a9..50fb07d 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertEmailPublisherTest.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertEmailPublisherTest.java @@ -38,14 +38,14 @@ public class AlertEmailPublisherTest { private SimpleSmtpServer server; @Before - public void setUp(){ + public void setUp() { config = ConfigFactory.load("application-test.conf"); server = SimpleSmtpServer.start(SMTP_PORT); } @After - public void clear(){ - if(server!=null) { + public void clear() { + if (server != null) { server.stop(); } } @@ -54,9 +54,9 @@ public class AlertEmailPublisherTest { public void testAlertEmailPublisher() throws Exception { AlertEmailPublisher publisher = new AlertEmailPublisher(); Map<String, Object> properties = new HashMap<>(); - properties.put(PublishConstants.SUBJECT,EMAIL_PUBLISHER_TEST_POLICY); - properties.put(PublishConstants.SENDER,"eagle@localhost"); - properties.put(PublishConstants.RECIPIENTS,"somebody@localhost"); + properties.put(PublishConstants.SUBJECT, EMAIL_PUBLISHER_TEST_POLICY); + properties.put(PublishConstants.SENDER, "eagle@localhost"); + properties.put(PublishConstants.RECIPIENTS, "somebody@localhost"); Publishment publishment = new Publishment(); publishment.setName("testEmailPublishment"); publishment.setType(org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher.class.getName()); @@ -65,9 +65,9 @@ public class AlertEmailPublisherTest { publishment.setSerializer(org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer.class.getName()); publishment.setProperties(properties); Map<String, String> conf = new HashMap<>(); - publisher.init(config, publishment,conf); + publisher.init(config, publishment, conf); publisher.onAlert(AlertPublisherTestHelper.mockEvent(EMAIL_PUBLISHER_TEST_POLICY)); - Assert.assertEquals(1,server.getReceivedEmailSize()); + Assert.assertEquals(1, server.getReceivedEmailSize()); Assert.assertTrue(server.getReceivedEmail().hasNext()); LOG.info("EMAIL:\n {}", server.getReceivedEmail().next()); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/725e7337/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationHealthCheckServiceImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationHealthCheckServiceImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationHealthCheckServiceImpl.java index b90d18b..d1d4360 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationHealthCheckServiceImpl.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationHealthCheckServiceImpl.java @@ -118,7 +118,7 @@ public class ApplicationHealthCheckServiceImpl extends ApplicationHealthCheckSer return; } ApplicationProvider<?> appProvider = applicationProviderService.getApplicationProviderByType(appEntity.getDescriptor().getType()); - Optional<HealthCheck> applicationHealthCheck = appProvider.getAppHealthCheck( + Optional<HealthCheck> applicationHealthCheck = appProvider.getManagedHealthCheck( ConfigFactory.parseMap(appEntity.getContext()) .withFallback(config) .withFallback(ConfigFactory.parseMap(appEntity.getConfiguration())) http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/725e7337/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java index fbae411..eff232a 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java @@ -18,6 +18,7 @@ package org.apache.eagle.app.spi; import com.codahale.metrics.health.HealthCheck; +import com.google.common.util.concurrent.Service; import com.typesafe.config.Config; import org.apache.eagle.app.Application; import org.apache.eagle.app.service.ApplicationListener; @@ -25,6 +26,7 @@ import org.apache.eagle.common.module.ModuleRegistry; import org.apache.eagle.metadata.model.ApplicationDesc; import java.lang.reflect.ParameterizedType; +import java.util.List; import java.util.Optional; /** @@ -72,7 +74,20 @@ public interface ApplicationProvider<T extends Application> { */ void register(ModuleRegistry registry); - default Optional<HealthCheck> getAppHealthCheck(Config config) { + /** + * @param config application config. + * @return Application-specific managed health check. + */ + default Optional<HealthCheck> getManagedHealthCheck(Config config) { + return Optional.empty(); + } + + /** + * + * @param envConfig server environment config. + * @return Server-level shared services. + */ + default Optional<List<Service>> getSharedServices(Config envConfig) { return Optional.empty(); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/725e7337/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppProvider.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppProvider.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppProvider.java index 090b3f3..fccd0df 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppProvider.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppProvider.java @@ -28,7 +28,7 @@ public class HadoopQueueRunningAppProvider extends AbstractApplicationProvider<H } @Override - public Optional<HealthCheck> getAppHealthCheck(Config config) { + public Optional<HealthCheck> getManagedHealthCheck(Config config) { return Optional.of(new HadoopQueueRunningApplicationHealthCheck(config)); } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/725e7337/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProvider.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProvider.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProvider.java index 89f20ec..8751e73 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProvider.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProvider.java @@ -17,10 +17,15 @@ package org.apache.eagle.jpm.mr.history; import com.codahale.metrics.health.HealthCheck; +import com.google.common.util.concurrent.Service; import com.typesafe.config.Config; +import io.dropwizard.lifecycle.Managed; import org.apache.eagle.app.service.ApplicationListener; import org.apache.eagle.app.spi.AbstractApplicationProvider; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.Optional; public class MRHistoryJobApplicationProvider extends AbstractApplicationProvider<MRHistoryJobApplication> { @@ -35,7 +40,16 @@ public class MRHistoryJobApplicationProvider extends AbstractApplicationProvider } @Override - public Optional<HealthCheck> getAppHealthCheck(Config config) { + public Optional<HealthCheck> getManagedHealthCheck(Config config) { return Optional.of(new MRHistoryJobApplicationHealthCheck(config)); } + + @Override + public Optional<List<Service>> getSharedServices(Config envConfig) { + if (envConfig.hasPath(MRHistoryJobDailyReporter.SERVICE_PATH)) { + return Optional.of(Collections.singletonList(new MRHistoryJobDailyReporter(envConfig))); + } else { + return Optional.empty(); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/725e7337/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobDailyReporter.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobDailyReporter.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobDailyReporter.java index 0dc6c5f..9aef640 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobDailyReporter.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobDailyReporter.java @@ -71,9 +71,10 @@ public class MRHistoryJobDailyReporter extends AbstractScheduledService { private static final String SUCCEEDED_JOB_QUERY = "%s[@site=\"%s\" and @currentState=\"SUCCEEDED\" and @durationTime>%s and @endTime<=%s]<@user>{count}.{count desc}"; private static final String FINISHED_JOB_QUERY = "%s[@site=\"%s\" and @endTime<=%s]<@user>{count}.{count desc}"; - private Config config; + private final Config config; + private IEagleServiceClient client; - private ApplicationEntityService applicationResource; + private ApplicationEmailService emailService; private boolean isDailySent = false; private long lastSentTime; @@ -89,9 +90,11 @@ public class MRHistoryJobDailyReporter extends AbstractScheduledService { private TimeZone timeZone; @Inject - public MRHistoryJobDailyReporter(Config config, ApplicationEntityService applicationEntityService) { - this.timeZone = TimeZone.getTimeZone(config.getString(EAGLE_TIME_ZONE)); + private ApplicationEntityService applicationEntityService; + public MRHistoryJobDailyReporter(Config config) { + this.config = config; + this.timeZone = TimeZone.getTimeZone(config.getString(EAGLE_TIME_ZONE)); if (config.hasPath(SERVICE_PATH) && config.hasPath(AlertEmailConstants.EAGLE_APPLICATION_EMAIL_SERVICE)) { this.emailService = new ApplicationEmailService(config, SERVICE_PATH); } @@ -107,8 +110,6 @@ public class MRHistoryJobDailyReporter extends AbstractScheduledService { if (config.hasPath(JOB_OVERTIME_LIMIT_HOUR)) { this.jobOvertimeLimit = config.getInt(JOB_OVERTIME_LIMIT_HOUR); } - this.config = config; - this.applicationResource = applicationEntityService; } private boolean isSentHour(int currentHour) { @@ -117,7 +118,7 @@ public class MRHistoryJobDailyReporter extends AbstractScheduledService { private Collection<String> loadSites(String appType) { Set<String> sites = new HashSet<>(); - Collection<ApplicationEntity> apps = applicationResource.findAll(); + Collection<ApplicationEntity> apps = applicationEntityService.findAll(); for (ApplicationEntity app : apps) { if (app.getDescriptor().getType().equalsIgnoreCase(appType) && app.getStatus().equals(ApplicationEntity.Status.RUNNING)) { sites.add(app.getSite().getSiteId()); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/725e7337/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobDailyReporterTest.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobDailyReporterTest.java b/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobDailyReporterTest.java index 3b297ae..73d1151 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobDailyReporterTest.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobDailyReporterTest.java @@ -47,15 +47,15 @@ public class MRHistoryJobDailyReporterTest { } @After - public void clear(){ - if(server!=null) { + public void clear() { + if ( server != null ) { server.stop(); } } @Test public void test() throws Exception { - MRHistoryJobDailyReporter reporter = new MRHistoryJobDailyReporter(config, null); + MRHistoryJobDailyReporter reporter = new MRHistoryJobDailyReporter(config); reporter.sendByEmail(mockAlertData()); Iterator it = server.getReceivedEmail(); Assert.assertTrue(server.getReceivedEmailSize() == 1); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/725e7337/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppProvider.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppProvider.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppProvider.java index 366d8cb..dcf335c 100644 --- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppProvider.java +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppProvider.java @@ -30,7 +30,7 @@ public class SparkHistoryJobAppProvider extends AbstractApplicationProvider<Spar } @Override - public Optional<HealthCheck> getAppHealthCheck(Config config) { + public Optional<HealthCheck> getManagedHealthCheck(Config config) { return Optional.of(new SparkHistoryJobApplicationHealthCheck(config)); } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/725e7337/eagle-server/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-server/pom.xml b/eagle-server/pom.xml index 32a5020..9a78b54 100644 --- a/eagle-server/pom.xml +++ b/eagle-server/pom.xml @@ -177,6 +177,12 @@ <groupId>javax.ws.rs</groupId> <artifactId>javax.ws.rs-api</artifactId> </dependency> + <dependency> + <groupId>dumbster</groupId> + <artifactId>dumbster</artifactId> + <version>1.6</version> + <scope>test</scope> + </dependency> </dependencies> <profiles> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/725e7337/eagle-server/src/main/java/org/apache/eagle/server/ServerApplication.java ---------------------------------------------------------------------- diff --git a/eagle-server/src/main/java/org/apache/eagle/server/ServerApplication.java b/eagle-server/src/main/java/org/apache/eagle/server/ServerApplication.java index 1d78ed1..ccf3c28 100644 --- a/eagle-server/src/main/java/org/apache/eagle/server/ServerApplication.java +++ b/eagle-server/src/main/java/org/apache/eagle/server/ServerApplication.java @@ -17,6 +17,7 @@ package org.apache.eagle.server; import com.google.inject.Inject; +import com.google.inject.Injector; import com.hubspot.dropwizard.guice.GuiceBundle; import com.sun.jersey.api.core.PackagesResourceConfig; import com.typesafe.config.Config; @@ -30,13 +31,14 @@ import io.swagger.jaxrs.listing.ApiListingResource; import org.apache.eagle.alert.coordinator.CoordinatorListener; import org.apache.eagle.alert.resource.SimpleCORSFiler; import org.apache.eagle.app.service.ApplicationHealthCheckService; +import org.apache.eagle.app.service.ApplicationProviderService; +import org.apache.eagle.app.spi.ApplicationProvider; import org.apache.eagle.common.Version; -import org.apache.eagle.jpm.mr.history.MRHistoryJobDailyReporter; import org.apache.eagle.log.base.taggedlog.EntityJsonModule; import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; import org.apache.eagle.metadata.service.ApplicationStatusUpdateService; import org.apache.eagle.server.authentication.BasicAuthProviderBuilder; -import org.apache.eagle.server.task.ApplicationTask; +import org.apache.eagle.server.task.ManagedService; import org.apache.eagle.server.module.GuiceBundleLoader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,7 +53,9 @@ class ServerApplication extends Application<ServerConfig> { @Inject private ApplicationHealthCheckService applicationHealthCheckService; @Inject - private MRHistoryJobDailyReporter mrHistoryJobDailyReporter; + private ApplicationProviderService applicationProviderService; + @Inject + private Injector injector; @Inject private Config config; @@ -107,18 +111,31 @@ class ServerApplication extends Application<ServerConfig> { // Context listener environment.servlets().addServletListeners(new CoordinatorListener()); + registerAppServices(environment); + } + + private void registerAppServices(Environment environment) { // Run application status service in background - Managed updateAppStatusTask = new ApplicationTask(applicationStatusUpdateService); + LOG.debug("Registering ApplicationStatusUpdateService"); + Managed updateAppStatusTask = new ManagedService(applicationStatusUpdateService); environment.lifecycle().manage(updateAppStatusTask); - // Initialize application health check environment + // Initialize application extended health checks. + LOG.debug("Registering ApplicationHealthCheckService"); applicationHealthCheckService.init(environment); - Managed appHealthCheckTask = new ApplicationTask(applicationHealthCheckService); - environment.lifecycle().manage(appHealthCheckTask); - - if (config.hasPath(MRHistoryJobDailyReporter.SERVICE_PATH)) { - Managed jobReportTask = new ApplicationTask(mrHistoryJobDailyReporter); - environment.lifecycle().manage(jobReportTask); + environment.lifecycle().manage(new ManagedService(applicationHealthCheckService)); + + // Load application shared extension services. + LOG.debug("Registering application shared extension services"); + for (ApplicationProvider<?> applicationProvider : applicationProviderService.getProviders()) { + applicationProvider.getSharedServices(config).ifPresent((services -> { + services.forEach(service -> { + LOG.info("Registering {} for {}", service.getClass().getCanonicalName(),applicationProvider.getApplicationDesc().getType()); + injector.injectMembers(service); + environment.lifecycle().manage(new ManagedService(service)); + }); + LOG.info("Registered {} services for {}", services.size(), applicationProvider.getApplicationDesc().getType()); + })); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/725e7337/eagle-server/src/main/java/org/apache/eagle/server/task/ApplicationTask.java ---------------------------------------------------------------------- diff --git a/eagle-server/src/main/java/org/apache/eagle/server/task/ApplicationTask.java b/eagle-server/src/main/java/org/apache/eagle/server/task/ApplicationTask.java deleted file mode 100644 index 0cddee7..0000000 --- a/eagle-server/src/main/java/org/apache/eagle/server/task/ApplicationTask.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.server.task; - -import com.google.common.util.concurrent.AbstractScheduledService; -import io.dropwizard.lifecycle.Managed; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ApplicationTask implements Managed { - private static final Logger LOG = LoggerFactory.getLogger(ApplicationTask.class); - private final AbstractScheduledService service; - - public ApplicationTask(AbstractScheduledService service) { - this.service = service; - } - - @Override - public void start() throws Exception { - LOG.info("Application update task started:"); - service.startAsync().awaitRunning(); - } - - @Override - public void stop() throws Exception { - service.stopAsync().awaitTerminated(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/725e7337/eagle-server/src/main/java/org/apache/eagle/server/task/ManagedService.java ---------------------------------------------------------------------- diff --git a/eagle-server/src/main/java/org/apache/eagle/server/task/ManagedService.java b/eagle-server/src/main/java/org/apache/eagle/server/task/ManagedService.java new file mode 100644 index 0000000..65909ce --- /dev/null +++ b/eagle-server/src/main/java/org/apache/eagle/server/task/ManagedService.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.server.task; + +import com.google.common.util.concurrent.Service; +import io.dropwizard.lifecycle.Managed; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ManagedService implements Managed { + private static final Logger LOG = LoggerFactory.getLogger(ManagedService.class); + private final Service service; + + public ManagedService(Service service) { + this.service = service; + } + + @Override + public void start() throws Exception { + LOG.info("Starting service {}", service.toString()); + service.startAsync().awaitRunning(); + } + + @Override + public void stop() throws Exception { + LOG.info("Stopping service {}", service.toString()); + service.stopAsync().awaitTerminated(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/725e7337/eagle-server/src/test/java/org/apache/eagle/server/EmbeddedMailService.java ---------------------------------------------------------------------- diff --git a/eagle-server/src/test/java/org/apache/eagle/server/EmbeddedMailService.java b/eagle-server/src/test/java/org/apache/eagle/server/EmbeddedMailService.java new file mode 100644 index 0000000..40aa3ca --- /dev/null +++ b/eagle-server/src/test/java/org/apache/eagle/server/EmbeddedMailService.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.server; + +import com.dumbster.smtp.SimpleSmtpServer; +import com.dumbster.smtp.SmtpMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.*; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.UriInfo; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.*; + +@Path("/mail") +public class EmbeddedMailService { + private static final Logger LOGGER = LoggerFactory.getLogger(EmbeddedMailService.class); + private static int SMTP_PORT = 5025; + private static String SMTP_HOST = "localhost"; + private static SimpleSmtpServer SMTP_SERVER = null; + private static final String MESSAGET_ID = "Message-ID"; + + static { + try { + SMTP_HOST = InetAddress.getLocalHost().getHostAddress(); + } catch (UnknownHostException e) { + LOGGER.error(e.getMessage(), e); + } + boolean success = false; + int attempt = 0; + while (!success && attempt < 3) { + try { + SMTP_PORT = SMTP_PORT + attempt; + LOGGER.info("Starting Local SMTP service: smtp://{}:{}", SMTP_HOST, SMTP_PORT, attempt); + SMTP_SERVER = SimpleSmtpServer.start(SMTP_PORT + attempt); + success = true; + } catch (Exception ex) { + LOGGER.warn("Failed to start SMTP service, attempt {}", attempt + 1, ex); + success = false; + } finally { + attempt++; + } + } + if (!success) { + LOGGER.error("Failed to start SMTP Server, exceeded max attempt times: 3"); + throw new IllegalStateException("Failed to start SMTP Server, exceeded max attempt times: 3"); + } + } + + @Context + UriInfo uri; + + @GET + @Path("") + @Produces(MediaType.APPLICATION_JSON) + public Map<String, Object> getMailServiceInfo() throws UnknownHostException { + String baseUri = uri.getBaseUri().toASCIIString(); + Iterator<SmtpMessage> messageIterator = SMTP_SERVER.getReceivedEmail(); + List<Map<String, Object>> receivedEmails = new ArrayList<>(SMTP_SERVER.getReceivedEmailSize()); + while (messageIterator.hasNext()) { + receivedEmails.add(convertEmail(messageIterator.next())); + } + + return new HashMap<String, Object>() {{ + put("smtp_server", createSMTPInfo()); + put("email_size", SMTP_SERVER.getReceivedEmailSize()); + put("emails", receivedEmails); + }}; + } + + private Map<String, Object> createSMTPInfo() { + return new HashMap<String, Object>() {{ + put("stopped", SMTP_SERVER.isStopped()); + put("host", SMTP_HOST); + put("port", SMTP_PORT); + put("auth", false); + }}; + } + + private Map<String, Object> convertEmail(SmtpMessage message) { + String baseUri = uri.getBaseUri().toASCIIString(); + + Map<String, String> headers = new HashMap<>(); + message.getHeaderNames().forEachRemaining(headerName -> { + headers.put((String) headerName, message.getHeaderValue((String) headerName)); + }); + return new HashMap<String, Object>() {{ + put("headers", headers); + put("body", message.getBody()); + put("urls", new HashMap<String, String>() {{ + put("json_url", baseUri + "mail/email/" + headers.get(MESSAGET_ID) + "?format=json"); + put("html_url", baseUri + "mail/email/" + headers.get(MESSAGET_ID) + "?format=html"); + }}); + }}; + } + + @GET + @Path("/email/{messageId}") + public Response getEmailByMessageId(@PathParam("messageId") String messageId, @QueryParam("format") String format) throws UnknownHostException { + Iterator<SmtpMessage> messageIterator = SMTP_SERVER.getReceivedEmail(); + while (messageIterator.hasNext()) { + SmtpMessage message = messageIterator.next(); + if (message.getHeaderValue("Message-ID").equals(messageId)) { + if (format != null && format.equalsIgnoreCase("html")) { + return Response.ok(message.getBody()).type(MediaType.TEXT_HTML_TYPE).build(); + } else { + return Response.ok(convertEmail(message)).type(MediaType.APPLICATION_JSON_TYPE).build(); + } + } + } + return Response.status(Response.Status.BAD_REQUEST).entity("Unknown Message-ID: " + messageId).build(); + } + + @POST + @Path("/smtp/reset") + @Produces(MediaType.APPLICATION_JSON) + public Map<String, Object> resetSMTPServer() throws UnknownHostException { + LOGGER.info("Resetting Local SMTP Server: smtp://{}:{}", SMTP_HOST, SMTP_PORT); + SMTP_SERVER.stop(); + SMTP_SERVER = SimpleSmtpServer.start(SMTP_PORT); + return getMailServiceInfo(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/725e7337/eagle-server/src/test/java/org/apache/eagle/server/ServerDebug.java ---------------------------------------------------------------------- diff --git a/eagle-server/src/test/java/org/apache/eagle/server/ServerDebug.java b/eagle-server/src/test/java/org/apache/eagle/server/ServerDebug.java index f870f2a..f47f2bf 100644 --- a/eagle-server/src/test/java/org/apache/eagle/server/ServerDebug.java +++ b/eagle-server/src/test/java/org/apache/eagle/server/ServerDebug.java @@ -34,10 +34,15 @@ public class ServerDebug { System.setProperty("config.resource","application.conf"); } - // String userDir = System.getProperty("user.dir"); LOGGER.info("user.dir = {}", userDir); serverConf = userDir + "/eagle-server/src/test/resources/configuration.yml"; + + try { + Class.forName(EmbeddedMailService.class.getName()); + } catch (ClassNotFoundException e) { + // Do nothing + } } public static void main(String[] args) { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/725e7337/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppProvider.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppProvider.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppProvider.java index 867c46a..92c650e 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppProvider.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppProvider.java @@ -31,7 +31,7 @@ public class TopologyCheckAppProvider extends AbstractApplicationProvider<Topolo } @Override - public Optional<HealthCheck> getAppHealthCheck(Config config) { + public Optional<HealthCheck> getManagedHealthCheck(Config config) { return Optional.of(new TopologyCheckApplicationHealthCheck(config)); } }