This is an automated email from the ASF dual-hosted git repository. mmiklavcic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/metron.git
The following commit(s) were added to refs/heads/master by this push: new 3c13173 METRON-2141 Cache REST API status update calls to the Storm UI (mmiklavc) closes apache/metron#1439 3c13173 is described below commit 3c1317360243398eef8097edcea5f7c1a0bb2f59 Author: mmiklavc <michael.miklav...@gmail.com> AuthorDate: Wed Jun 12 17:13:16 2019 -0600 METRON-2141 Cache REST API status update calls to the Storm UI (mmiklavc) closes apache/metron#1439 --- .../CURRENT/configuration/metron-rest-env.xml | 13 +- .../CURRENT/package/scripts/params/params_linux.py | 2 + .../METRON/CURRENT/package/templates/metron.j2 | 3 + .../METRON/CURRENT/themes/metron_theme.json | 20 +++ .../apache/metron/rest/model/TopologySummary.java | 11 +- metron-interface/metron-rest/README.md | 24 ++-- .../src/main/config/rest_application.yml | 4 + .../apache/metron/rest/MetronRestConstants.java | 2 + .../org/apache/metron/rest/config/StormConfig.java | 21 ++- .../service/impl/CachedStormStatusServiceImpl.java | 127 +++++++++++++++++ .../rest/service/impl/StormStatusServiceImpl.java | 14 +- .../org/apache/metron/rest/config/TestConfig.java | 12 ++ .../impl/CachedStormStatusServiceImplTest.java | 158 +++++++++++++++++++++ 13 files changed, 385 insertions(+), 26 deletions(-) diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-rest-env.xml b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-rest-env.xml index 68b1140..145b64e 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-rest-env.xml +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-rest-env.xml @@ -181,5 +181,16 @@ <description>The field name where the threat triage score can be found in the search indices. This setting primarily affects the Alerts UI.</description> <value>threat:triage:score</value> </property> - + <property> + <name>storm_status_cache_max_size</name> + <value>10000</value> + <description>The maximum size for the cache that fronts calls to the Storm API for topology status.</description> + <display-name>Storm Status Cache Max Size</display-name> + </property> + <property> + <name>storm_status_cache_timeout_seconds</name> + <value>5</value> + <description>Duration in seconds for cache entries to timeout. Note that the higher the value, the more stale the returned value will be.</description> + <display-name>Storm Status Cache Timeout Seconds</display-name> + </property> </configuration> diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py index de6b8bc..a7f20fc 100755 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py @@ -64,6 +64,8 @@ metron_alerts_ui_host = status_params.metron_alerts_ui_host metron_alerts_ui_port = status_params.metron_alerts_ui_port metron_alerts_ui_path = metron_home + '/web/alerts-ui/' metron_jvm_flags = config['configurations']['metron-rest-env']['metron_jvm_flags'] +storm_status_cache_max_size = config['configurations']['metron-rest-env']['storm_status_cache_max_size'] +storm_status_cache_timeout_seconds = config['configurations']['metron-rest-env']['storm_status_cache_timeout_seconds'] # Construct the profiles as a temp variable first. Only the first time it's set will carry through metron_spring_profiles_active = config['configurations']['metron-rest-env']['metron_spring_profiles_active'] diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/metron.j2 b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/metron.j2 index 936118c..5c43bbd 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/metron.j2 +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/metron.j2 @@ -66,3 +66,6 @@ PCAP_FINAL_OUTPUT_PATH="{{pcap_final_output_path}}" PCAP_PAGE_SIZE="{{pcap_page_size}}" PCAP_YARN_QUEUE="{{pcap_yarn_queue}}" PCAP_FINALIZER_THREADPOOL_SIZE="{{pcap_finalizer_threadpool_size}}" +STORM_STATUS_CACHE_MAX_SIZE="{{storm_status_cache_max_size}}" +STORM_STATUS_CACHE_TIMEOUT_SECONDS="{{storm_status_cache_timeout_seconds}}" + diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json index 69084e3..a9b7322 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json @@ -915,6 +915,14 @@ "subsection-name": "subsection-rest" }, { + "config": "metron-rest-env/storm_status_cache_max_size", + "subsection-name": "subsection-rest" + }, + { + "config": "metron-rest-env/storm_status_cache_timeout_seconds", + "subsection-name": "subsection-rest" + }, + { "config": "metron-management-ui-env/metron_management_ui_port", "subsection-name": "subsection-management-ui" }, @@ -1648,6 +1656,18 @@ } }, { + "config": "metron-rest-env/storm_status_cache_max_size", + "widget": { + "type": "text-field" + } + }, + { + "config": "metron-rest-env/storm_status_cache_timeout_seconds", + "widget": { + "type": "text-field" + } + }, + { "config": "metron-pcap-env/pcap_page_size", "widget": { "type": "text-field" diff --git a/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/TopologySummary.java b/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/TopologySummary.java index 8621daf..5bd854b 100644 --- a/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/TopologySummary.java +++ b/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/TopologySummary.java @@ -21,9 +21,16 @@ import java.util.Arrays; public class TopologySummary { - private TopologyStatus[] topologies; + private TopologyStatus[] topologies; - public TopologyStatus[] getTopologies() { + public TopologySummary() { + } + + public TopologySummary(TopologyStatus[] topologies) { + this.topologies = topologies; + } + + public TopologyStatus[] getTopologies() { if (topologies == null) { return new TopologyStatus[0]; } diff --git a/metron-interface/metron-rest/README.md b/metron-interface/metron-rest/README.md index c76a402..aa28260 100644 --- a/metron-interface/metron-rest/README.md +++ b/metron-interface/metron-rest/README.md @@ -79,17 +79,19 @@ No optional parameter has a default. | HDFS_URL | HDFS url or `fs.defaultFS` Hadoop setting (ex. hdfs://node1:8020) ### Optional - With Defaults -| Environment Variable | Description | Required | Default -| ------------------------------------- | ------------------------------------------------------------------------------------ | -------- | ------- -| METRON_LOG_DIR | Directory where the log file is written | Optional | /var/log/metron/ -| METRON_PID_FILE | File where the pid is written | Optional | /var/run/metron/ -| METRON_REST_PORT | REST application port | Optional | 8082 -| METRON_JDBC_CLIENT_PATH | Path to JDBC client jar | Optional | H2 is bundled -| METRON_TEMP_GROK_PATH | Temporary directory used to test grok statements | Optional | ./patterns/temp -| METRON_DEFAULT_GROK_PATH | Defaults HDFS directory used to store grok statements | Optional | /apps/metron/patterns -| SECURITY_ENABLED | Enables Kerberos support | Optional | false -| METRON_USER_ROLE | Name of the role at the authentication provider that provides user access to Metron. | Optional | USER -| METRON_ADMIN_ROLE | Name of the role at the authentication provider that provides administrative access to Metron.| Optional | ADMIN +| Environment Variable | Description | Required | Default +| ------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------- | -------- | ------- +| METRON_LOG_DIR | Directory where the log file is written | Optional | /var/log/metron/ +| METRON_PID_FILE | File where the pid is written | Optional | /var/run/metron/ +| METRON_REST_PORT | REST application port | Optional | 8082 +| METRON_JDBC_CLIENT_PATH | Path to JDBC client jar | Optional | H2 is bundled +| METRON_TEMP_GROK_PATH | Temporary directory used to test grok statements | Optional | ./patterns/temp +| METRON_DEFAULT_GROK_PATH | Defaults HDFS directory used to store grok statements | Optional | /apps/metron/patterns +| SECURITY_ENABLED | Enables Kerberos support | Optional | false +| METRON_USER_ROLE | Name of the role at the authentication provider that provides user access to Metron. | Optional | USER +| METRON_ADMIN_ROLE | Name of the role at the authentication provider that provides administrative access to Metron. | Optional | ADMIN +| STORM_STATUS_CACHE_MAX_SIZE | The maximum size for the cache that fronts calls to the Storm API for topology status. | Optional | 10000 +| STORM_STATUS_CACHE_TIMEOUT_SECONDS | Duration in seconds for cache entries to timeout. Note that the higher the value, the more stale the returned value will be. | Optional | 5 ### Optional - Blank Defaults | Environment Variable | Description | Required diff --git a/metron-interface/metron-rest/src/main/config/rest_application.yml b/metron-interface/metron-rest/src/main/config/rest_application.yml index b6709a9..48cc5b4 100644 --- a/metron-interface/metron-rest/src/main/config/rest_application.yml +++ b/metron-interface/metron-rest/src/main/config/rest_application.yml @@ -40,6 +40,10 @@ storm: indexing: randomaccess.script.path: ${METRON_HOME}/bin/start_elasticsearch_topology.sh batch.script.path: ${METRON_HOME}/bin/start_hdfs_topology.sh + status: + cache: + max.size: ${STORM_STATUS_CACHE_MAX_SIZE} + timeout.seconds: ${STORM_STATUS_CACHE_TIMEOUT_SECONDS} kerberos: enabled: ${SECURITY_ENABLED} diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java index b8a8306..54f721c 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java @@ -33,6 +33,8 @@ public class MetronRestConstants { public static final String GROK_PATH_KEY = "grokPath"; public static final String STORM_UI_SPRING_PROPERTY = "storm.ui.url"; + public static final String STORM_STATUS_CACHE_MAX_SIZE = "storm.status.cache.max.size"; + public static final String STORM_STATUS_CACHE_TIMEOUT_SECONDS = "storm.status.cache.timeout.seconds"; public static final String SUPERVISOR_SUMMARY_URL = "/api/v1/supervisor/summary"; public static final String TOPOLOGY_SUMMARY_URL = "/api/v1/topology/summary"; public static final String TOPOLOGY_URL = "/api/v1/topology"; diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/StormConfig.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/StormConfig.java index 7a61cbc..7d31ce1 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/StormConfig.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/StormConfig.java @@ -17,19 +17,22 @@ */ package org.apache.metron.rest.config; +import static org.apache.metron.rest.MetronRestConstants.DOCKER_PROFILE; +import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE; + +import java.util.Arrays; +import org.apache.metron.rest.MetronRestConstants; +import org.apache.metron.rest.service.StormStatusService; +import org.apache.metron.rest.service.impl.CachedStormStatusServiceImpl; import org.apache.metron.rest.service.impl.DockerStormCLIWrapper; import org.apache.metron.rest.service.impl.StormCLIWrapper; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Profile; import org.springframework.core.env.Environment; -import java.util.Arrays; - -import static org.apache.metron.rest.MetronRestConstants.DOCKER_PROFILE; -import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE; - @Configuration @Profile("!" + TEST_PROFILE) public class StormConfig { @@ -45,4 +48,12 @@ public class StormConfig { return new StormCLIWrapper(); } } + + @Bean + public StormStatusService stormStatusService( + @Autowired @Qualifier("StormStatusServiceImpl") StormStatusService wrappedService) { + long maxCacheSize = environment.getProperty(MetronRestConstants.STORM_STATUS_CACHE_MAX_SIZE, Long.class, 10000L); + long maxCacheTimeoutSeconds = environment.getProperty(MetronRestConstants.STORM_STATUS_CACHE_TIMEOUT_SECONDS, Long.class, 5L); + return new CachedStormStatusServiceImpl(wrappedService, maxCacheSize, maxCacheTimeoutSeconds); + } } diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/CachedStormStatusServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/CachedStormStatusServiceImpl.java new file mode 100644 index 0000000..788d8f5 --- /dev/null +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/CachedStormStatusServiceImpl.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.rest.service.impl; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import java.lang.invoke.MethodHandles; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.metron.rest.model.SupervisorSummary; +import org.apache.metron.rest.model.TopologyResponse; +import org.apache.metron.rest.model.TopologyStatus; +import org.apache.metron.rest.model.TopologySummary; +import org.apache.metron.rest.service.StormStatusService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Decorator around the Storm status service that caches results. + */ +public class CachedStormStatusServiceImpl implements StormStatusService { + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private enum CacheKey { + SUPERVISOR_SUMMARY, + TOPOLOGY_SUMMARY, + TOPOLOGY_STATUS, + ALL_TOPOLOGY_STATUS + } + + private StormStatusService stormService; + // cache is thread-safe + // key will be a base CacheKey or the base + a suffix. See for example getTopologyStatus(String name). + private Cache<Object, Object> statusCache; + + /** + * + * @param stormService service to decorate and delegate calls to. + * @param maxCacheSize max number of records in the backing cache. + * @param cacheExpirationSeconds number of seconds before the cache will expire an entry. + */ + public CachedStormStatusServiceImpl(StormStatusService stormService, long maxCacheSize, + long cacheExpirationSeconds) { + this.stormService = stormService; + LOG.info("Creating Storm service cache with max size '{}', record expiration seconds '{}'", + maxCacheSize, cacheExpirationSeconds); + Caffeine builder = Caffeine.newBuilder().maximumSize(maxCacheSize) + .expireAfterWrite(cacheExpirationSeconds, TimeUnit.SECONDS); + statusCache = builder.build(); + } + + @Override + public SupervisorSummary getSupervisorSummary() { + return (SupervisorSummary) statusCache + .get(CacheKey.SUPERVISOR_SUMMARY, cacheKey -> { + LOG.debug("Loading new supervisor summary"); + return stormService.getSupervisorSummary(); + }); + } + + @Override + public TopologySummary getTopologySummary() { + return (TopologySummary) statusCache + .get(CacheKey.TOPOLOGY_SUMMARY, cacheKey -> { + LOG.debug("Loading new topology summary"); + return stormService.getTopologySummary(); + }); + } + + /** + * Rather than worry about coalescing individual topology statuses with the call to get all topology statuses, + * we handle them independently. + * @param name topology name. + * @return status for this topolopgy. + */ + @Override + public TopologyStatus getTopologyStatus(String name) { + return (TopologyStatus) statusCache + .get(CacheKey.TOPOLOGY_STATUS + name, cacheKey -> { + LOG.debug("Loading new topology status for '{}'", name); + return stormService.getTopologyStatus(name); + }); + } + + @Override + public List<TopologyStatus> getAllTopologyStatus() { + return (List<TopologyStatus>) statusCache + .get(CacheKey.ALL_TOPOLOGY_STATUS, cacheKey -> { + LOG.debug("Loading all topology status"); + return stormService.getAllTopologyStatus(); + }); + } + + @Override + public TopologyResponse activateTopology(String name) { + return stormService.activateTopology(name); + } + + @Override + public TopologyResponse deactivateTopology(String name) { + return stormService.deactivateTopology(name); + } + + /** + * Resets the cache, i.e. empties it. + */ + public void reset() { + statusCache.invalidateAll(); + } +} diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormStatusServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormStatusServiceImpl.java index 25df549..081bd4e 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormStatusServiceImpl.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormStatusServiceImpl.java @@ -15,16 +15,19 @@ package org.apache.metron.rest.service.impl; +import static org.apache.metron.rest.MetronRestConstants.STORM_UI_SPRING_PROPERTY; +import static org.apache.metron.rest.MetronRestConstants.SUPERVISOR_SUMMARY_URL; +import static org.apache.metron.rest.MetronRestConstants.TOPOLOGY_SUMMARY_URL; +import static org.apache.metron.rest.MetronRestConstants.TOPOLOGY_URL; + import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; - import org.apache.metron.common.configuration.SensorParserGroup; import org.apache.metron.parsers.topology.ParserTopologyCLI; -import org.apache.metron.rest.RestException; import org.apache.metron.rest.model.SupervisorSummary; import org.apache.metron.rest.model.TopologyResponse; import org.apache.metron.rest.model.TopologyStatus; @@ -33,15 +36,12 @@ import org.apache.metron.rest.model.TopologySummary; import org.apache.metron.rest.service.SensorParserGroupService; import org.apache.metron.rest.service.StormStatusService; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.core.env.Environment; import org.springframework.stereotype.Service; import org.springframework.web.client.RestTemplate; -import static org.apache.metron.rest.MetronRestConstants.STORM_UI_SPRING_PROPERTY; -import static org.apache.metron.rest.MetronRestConstants.SUPERVISOR_SUMMARY_URL; -import static org.apache.metron.rest.MetronRestConstants.TOPOLOGY_SUMMARY_URL; -import static org.apache.metron.rest.MetronRestConstants.TOPOLOGY_URL; - +@Qualifier("StormStatusServiceImpl") @Service public class StormStatusServiceImpl implements StormStatusService { diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java index 920c7ab..d42f128 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java @@ -60,9 +60,13 @@ import org.apache.metron.rest.mock.MockPcapJobSupplier; import org.apache.metron.rest.mock.MockPcapToPdmlScriptWrapper; import org.apache.metron.rest.mock.MockStormCLIClientWrapper; import org.apache.metron.rest.mock.MockStormRestTemplate; +import org.apache.metron.rest.service.StormStatusService; +import org.apache.metron.rest.service.impl.CachedStormStatusServiceImpl; import org.apache.metron.rest.service.impl.PcapToPdmlScriptWrapper; import org.apache.metron.rest.service.impl.StormCLIWrapper; import org.apache.metron.rest.user.UserSettingsClient; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Profile; @@ -237,4 +241,12 @@ public class TestConfig { public PcapToPdmlScriptWrapper pcapToPdmlScriptWrapper() { return new MockPcapToPdmlScriptWrapper(); } + + @Bean + public StormStatusService stormStatusService( + @Autowired @Qualifier("StormStatusServiceImpl") StormStatusService wrappedService) { + long maxCacheSize = 0L; + long maxCacheTimeoutSeconds = 0L; + return new CachedStormStatusServiceImpl(wrappedService, maxCacheSize, maxCacheTimeoutSeconds); + } } diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/CachedStormStatusServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/CachedStormStatusServiceImplTest.java new file mode 100644 index 0000000..993465f --- /dev/null +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/CachedStormStatusServiceImplTest.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.rest.service.impl; + +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import org.apache.metron.rest.model.SupervisorStatus; +import org.apache.metron.rest.model.SupervisorSummary; +import org.apache.metron.rest.model.TopologyResponse; +import org.apache.metron.rest.model.TopologyStatus; +import org.apache.metron.rest.model.TopologySummary; +import org.apache.metron.rest.service.StormStatusService; +import org.hamcrest.CoreMatchers; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +public class CachedStormStatusServiceImplTest { + + @Mock + private StormStatusService stormService; + private CachedStormStatusServiceImpl cachedStormStatusService; + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + cachedStormStatusService = new CachedStormStatusServiceImpl(stormService, 150, 30); + } + + @Test + public void caches_supervisor_summary() { + SupervisorStatus supervisorStatus1 = new SupervisorStatus(); + SupervisorStatus supervisorStatus2 = new SupervisorStatus(); + SupervisorSummary supervisorSummary = new SupervisorSummary( + new SupervisorStatus[]{supervisorStatus1, supervisorStatus2}); + when(stormService.getSupervisorSummary()).thenReturn(supervisorSummary); + // should hit the cache + for (int i = 0; i < 100; i++) { + cachedStormStatusService.getSupervisorSummary(); + } + SupervisorSummary summary = cachedStormStatusService.getSupervisorSummary(); + assertThat("Number of supervisors did not match.", summary.getSupervisors().length, + CoreMatchers.equalTo(2)); + verify(stormService, times(1)).getSupervisorSummary(); + cachedStormStatusService.reset(); + summary = cachedStormStatusService.getSupervisorSummary(); + assertThat("Number of supervisors did not match.", summary.getSupervisors().length, + CoreMatchers.equalTo(2)); + verify(stormService, times(2)).getSupervisorSummary(); + } + + @Test + public void caches_topology_summary() { + TopologyStatus topologyStatus1 = new TopologyStatus(); + TopologyStatus topologyStatus2 = new TopologyStatus(); + TopologySummary topologySummary = new TopologySummary( + new TopologyStatus[]{topologyStatus1, topologyStatus2}); + when(stormService.getTopologySummary()).thenReturn(topologySummary); + // should hit the cache + for (int i = 0; i < 100; i++) { + cachedStormStatusService.getTopologySummary(); + } + TopologySummary summary = cachedStormStatusService.getTopologySummary(); + assertThat("Number of topologies did not match.", summary.getTopologies().length, + CoreMatchers.equalTo(2)); + verify(stormService, times(1)).getTopologySummary(); + cachedStormStatusService.reset(); + summary = cachedStormStatusService.getTopologySummary(); + assertThat("Number of topologies did not match.", summary.getTopologies().length, + CoreMatchers.equalTo(2)); + verify(stormService, times(2)).getTopologySummary(); + } + + @Test + public void caches_topology_status_by_name() { + String topologyName1 = "topology-1"; + String topologyName2 = "topology-2"; + TopologyStatus topologyStatus1 = new TopologyStatus(); + topologyStatus1.setName(topologyName1); + TopologyStatus topologyStatus2 = new TopologyStatus(); + topologyStatus2.setName(topologyName2); + when(stormService.getTopologyStatus(topologyName1)).thenReturn(topologyStatus1); + when(stormService.getTopologyStatus(topologyName2)).thenReturn(topologyStatus2); + // should hit the cache + for (int i = 0; i < 100; i++) { + cachedStormStatusService.getTopologyStatus(topologyName1); + cachedStormStatusService.getTopologyStatus(topologyName2); + } + TopologyStatus status1 = cachedStormStatusService.getTopologyStatus(topologyName1); + TopologyStatus status2 = cachedStormStatusService.getTopologyStatus(topologyName2); + assertThat("Name did not match for topology 1.", status1.getName(), + CoreMatchers.equalTo(topologyName1)); + assertThat("Name did not match for topology 2.", status2.getName(), + CoreMatchers.equalTo(topologyName2)); + verify(stormService, times(1)).getTopologyStatus(topologyName1); + verify(stormService, times(1)).getTopologyStatus(topologyName2); + cachedStormStatusService.reset(); + cachedStormStatusService.getTopologyStatus(topologyName1); + cachedStormStatusService.getTopologyStatus(topologyName2); + verify(stormService, times(2)).getTopologyStatus(topologyName1); + verify(stormService, times(2)).getTopologyStatus(topologyName2); + } + + @Test + public void caches_all_topology_status() { + TopologyStatus topologyStatus1 = new TopologyStatus(); + TopologyStatus topologyStatus2 = new TopologyStatus(); + List<TopologyStatus> allTopologyStatus = ImmutableList.of(topologyStatus1, topologyStatus2); + when(stormService.getAllTopologyStatus()).thenReturn(allTopologyStatus); + // should hit the cache + for (int i = 0; i < 100; i++) { + cachedStormStatusService.getAllTopologyStatus(); + } + List<TopologyStatus> allStatus = cachedStormStatusService.getAllTopologyStatus(); + assertThat("Number of topologies returned by all topology status check did not match.", + allStatus.size(), CoreMatchers.equalTo(2)); + verify(stormService, times(1)).getAllTopologyStatus(); + cachedStormStatusService.reset(); + cachedStormStatusService.getAllTopologyStatus(); + verify(stormService, times(2)).getAllTopologyStatus(); + } + + @Test + public void admin_functions_act_as_simple_passthroughs_to_storm_service() { + TopologyResponse topologyResponse = new TopologyResponse(); + when(stormService.activateTopology(anyString())).thenReturn(topologyResponse); + when(stormService.deactivateTopology(anyString())).thenReturn(topologyResponse); + for (int i = 0; i < 100; i++) { + cachedStormStatusService.activateTopology("foo"); + cachedStormStatusService.deactivateTopology("foo"); + } + verify(stormService, times(100)).activateTopology(anyString()); + verify(stormService, times(100)).deactivateTopology(anyString()); + } +}