This is an automated email from the ASF dual-hosted git repository. lmccay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/knox.git
The following commit(s) were added to refs/heads/master by this push: new 8920651 KNOX-2304 - CM discovery cluster config monitor needs to be aware of … (#307) 8920651 is described below commit 89206511aad7cacb91c7fa032490816e7cdd18ba Author: lmccay <lmc...@apache.org> AuthorDate: Wed Apr 8 14:38:46 2020 -0400 KNOX-2304 - CM discovery cluster config monitor needs to be aware of … (#307) * KNOX-2304 - CM discovery cluster config monitor needs to be aware of all relevant CM event types Change-Id: Ic26ad36fcb110e01d30d636f14d5b383de01ff17 * KNOX-2304 - address review comments Change-Id: I5c2009f505b31c5c69ee7672ed37d21e1a7c48bb --- .../cm/monitor/PollingConfigurationAnalyzer.java | 100 ++++++++++++++------- .../monitor/PollingConfigurationAnalyzerTest.java | 37 +++++++- 2 files changed, 101 insertions(+), 36 deletions(-) diff --git a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java index b9f163b..380962a 100644 --- a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java +++ b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java @@ -60,17 +60,27 @@ import static org.apache.knox.gateway.topology.discovery.ClusterConfigurationMon @SuppressWarnings("PMD.DoNotUseThreads") public class PollingConfigurationAnalyzer implements Runnable { - // The format of the filter employed when restart events are queried from ClouderaManager - private static final String RESTART_EVENTS_QUERY_FORMAT = + private static final String COMMAND = "COMMAND"; + + private static final String COMMAND_STATUS = "COMMAND_STATUS"; + + private static final String STARTED_STATUS = "STARTED"; + + private static final String SUCCEEDED_STATUS = "SUCCEEDED"; + + private static final String RESTART_COMMAND = "Restart"; + + private static final String START_COMMAND = "Start"; + + // The format of the filter employed when start events are queried from ClouderaManager + private static final String EVENTS_QUERY_FORMAT = "category==" + ApiEventCategory.AUDIT_EVENT.getValue() + - ";attributes.command==Restart" + - ";attributes.command_status==SUCCEEDED" + ";attributes.cluster==\"%s\"%s"; - // The format of the timestamp element of the restart events query filter + // The format of the timestamp element of the start events query filter private static final String EVENTS_QUERY_TIMESTAMP_FORMAT = ";timeOccurred=gt=%s"; - // The default amount of time before "now" to check for restart events the first time + // The default amount of time before "now" to check for start events the first time private static final long DEFAULT_EVENT_QUERY_DEFAULT_TIMESTAMP_OFFSET = (60 * 60 * 1000); // one hour private static final int DEFAULT_POLLING_INTERVAL = 60; @@ -100,10 +110,10 @@ public class PollingConfigurationAnalyzer implements Runnable { // Cache of ClouderaManager API clients, keyed by discovery address private final Map<String, DiscoveryApiClient> clients = new ConcurrentHashMap<>(); - // Timestamp records of the most recent restart event query per discovery address + // Timestamp records of the most recent start event query per discovery address private Map<String, String> eventQueryTimestamps = new ConcurrentHashMap<>(); - // The amount of time before "now" to will check for restart events the first time + // The amount of time before "now" to will check for start events the first time private long eventQueryDefaultTimestampOffset = DEFAULT_EVENT_QUERY_DEFAULT_TIMESTAMP_OFFSET; private boolean isActive; @@ -163,34 +173,34 @@ public class PollingConfigurationAnalyzer implements Runnable { continue; } - // Configuration changes don't mean anything without corresponding service restarts. Therefore, monitor - // restart events, and check the configuration only of the restarted service(s) to identify changes + // Configuration changes don't mean anything without corresponding service start/restarts. Therefore, monitor + // start events, and check the configuration only of the restarted service(s) to identify changes // that should trigger re-discovery. - List<RestartEvent> restartEvents = getRestartEvents(address, clusterName); + List<StartEvent> relevantEvents = getRelevantEvents(address, clusterName); - // If there are no recent restart events, then nothing to do now - if (!restartEvents.isEmpty()) { + // If there are no recent start events, then nothing to do now + if (!relevantEvents.isEmpty()) { boolean configHasChanged = false; - // If there are restart events, then check the previously-recorded properties for the same service to + // If there are start events, then check the previously-recorded properties for the same service to // identify if the configuration has changed Map<String, ServiceConfigurationModel> serviceConfigurations = configCache.getClusterServiceConfigurations(address, clusterName); - // Those services for which a restart even has been handled + // Those services for which a start even has been handled List<String> handledServiceTypes = new ArrayList<>(); - for (RestartEvent re : restartEvents) { + for (StartEvent re : relevantEvents) { String serviceType = re.getServiceType(); - // Determine if we've already handled a restart event for this service type + // Determine if we've already handled a start event for this service type if (!handledServiceTypes.contains(serviceType)) { // Get the previously-recorded configuration ServiceConfigurationModel serviceConfig = serviceConfigurations.get(re.getServiceType()); if (serviceConfig != null) { - // Get the current config for the restarted service, and compare with the previously-recorded config + // Get the current config for the started service, and compare with the previously-recorded config ServiceConfigurationModel currentConfig = getCurrentServiceConfiguration(address, clusterName, re.getService()); @@ -337,15 +347,15 @@ public class PollingConfigurationAnalyzer implements Runnable { } /** - * Get restart events for the specified ClouderaManager cluster. + * Get relevant events for the specified ClouderaManager cluster. * * @param address The address of the ClouderaManager instance. * @param clusterName The name of the cluster. * - * @return A List of RestartEvent objects for service restart events since the last time they were queried. + * @return A List of StartEvent objects for service start events since the last time they were queried. */ - private List<RestartEvent> getRestartEvents(final String address, final String clusterName) { - List<RestartEvent> restartEvents = new ArrayList<>(); + private List<StartEvent> getRelevantEvents(final String address, final String clusterName) { + List<StartEvent> relevantEvents = new ArrayList<>(); // Get the last event query timestamp String lastTimestamp = getEventQueryTimestamp(address, clusterName); @@ -360,19 +370,43 @@ public class PollingConfigurationAnalyzer implements Runnable { // Record the new event query timestamp for this address/cluster setEventQueryTimestamp(address, clusterName, Instant.now()); - // Query the event log from CM for service/cluster restart events - List<ApiEvent> events = queryRestartEvents(getApiClient(configCache.getDiscoveryConfig(address, clusterName)), + // Query the event log from CM for service/cluster start events + List<ApiEvent> events = queryEvents(getApiClient(configCache.getDiscoveryConfig(address, clusterName)), clusterName, lastTimestamp); for (ApiEvent event : events) { - restartEvents.add(new RestartEvent(event)); + if(isRelevantEvent(event)) { + relevantEvents.add(new StartEvent(event)); + } + } + + return relevantEvents; + } + + @SuppressWarnings("unchecked") + private boolean isRelevantEvent(ApiEvent event) { + boolean rc = false; + String command = null; + String status = null; + List<ApiEventAttribute> attributes = event.getAttributes(); + Map<String,Object> map = getAttributeMap(attributes); + command = (String) ((List<String>) map.get(COMMAND)).get(0); + status = (String) ((List<String>) map.get(COMMAND_STATUS)).get(0); + if (START_COMMAND.equals(command) || RESTART_COMMAND.equals(command) && + SUCCEEDED_STATUS.equals(status) || STARTED_STATUS.equals(status)) { + rc = true; } + return rc; + } - return restartEvents; + private Map<String, Object> getAttributeMap(List<ApiEventAttribute> attributes) { + Map<String,Object> map = new HashMap<>(); + attributes.forEach(attr -> { map.put(attr.getName(), attr.getValues());}); + return map; } /** - * Query the ClouderaManager instance associated with the specified client for any service restart events in the + * Query the ClouderaManager instance associated with the specified client for any service start events in the * specified cluster since the specified time. * * @param client A ClouderaManager API client. @@ -381,15 +415,15 @@ public class PollingConfigurationAnalyzer implements Runnable { * * @return A List of ApiEvent objects representing the relevant events since the specified time. */ - protected List<ApiEvent> queryRestartEvents(final ApiClient client, final String clusterName, final String since) { + protected List<ApiEvent> queryEvents(final ApiClient client, final String clusterName, final String since) { List<ApiEvent> events = new ArrayList<>(); - // Setup the query for restart events + // Setup the query for events String timeFilter = (since != null) ? String.format(Locale.ROOT, EVENTS_QUERY_TIMESTAMP_FORMAT, since) : ""; String queryString = String.format(Locale.ROOT, - RESTART_EVENTS_QUERY_FORMAT, + EVENTS_QUERY_FORMAT, clusterName, timeFilter); @@ -500,9 +534,9 @@ public class PollingConfigurationAnalyzer implements Runnable { } /** - * Internal representation of a ClouderaManager service restart event + * Internal representation of a ClouderaManager service start event */ - static final class RestartEvent { + static final class StartEvent { private static final String ATTR_CLUSTER = "CLUSTER"; private static final String ATTR_SERVICE_TYPE = "SERVICE_TYPE"; @@ -521,7 +555,7 @@ public class PollingConfigurationAnalyzer implements Runnable { private String serviceType; private String service; - RestartEvent(final ApiEvent auditEvent) { + StartEvent(final ApiEvent auditEvent) { if (ApiEventCategory.AUDIT_EVENT != auditEvent.getCategory()) { throw new IllegalArgumentException("Invalid event category " + auditEvent.getCategory().getValue()); } diff --git a/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java b/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java index a817c83..cb2066e 100644 --- a/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java +++ b/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java @@ -27,7 +27,9 @@ import org.apache.knox.gateway.services.ServiceType; import org.apache.knox.gateway.services.topology.TopologyService; import org.apache.knox.gateway.topology.ClusterConfigurationMonitorService; import org.apache.knox.gateway.topology.discovery.ServiceDiscoveryConfig; +import org.apache.knox.gateway.topology.discovery.cm.model.cm.ClouderaManagerAPIServiceModelGenerator; import org.apache.knox.gateway.topology.discovery.cm.model.hdfs.NameNodeServiceModelGenerator; +import org.apache.knox.gateway.topology.discovery.cm.model.hive.HiveOnTezServiceModelGenerator; import org.easymock.EasyMock; import org.junit.Test; @@ -75,7 +77,7 @@ public class PollingConfigurationAnalyzerTest { apiEventAttrs.add(createEventAttribute("SERVICE", service)); ApiEvent apiEvent = createApiEvent(category, apiEventAttrs); - PollingConfigurationAnalyzer.RestartEvent restartEvent = new PollingConfigurationAnalyzer.RestartEvent(apiEvent); + PollingConfigurationAnalyzer.StartEvent restartEvent = new PollingConfigurationAnalyzer.StartEvent(apiEvent); assertNotNull(restartEvent); assertEquals(clusterName, restartEvent.getClusterName()); assertEquals(serviceType, restartEvent.getServiceType()); @@ -145,11 +147,33 @@ public class PollingConfigurationAnalyzerTest { restartEventAttrs.add(createEventAttribute("CLUSTER", clusterName)); restartEventAttrs.add(createEventAttribute("SERVICE_TYPE", NameNodeServiceModelGenerator.SERVICE_TYPE)); restartEventAttrs.add(createEventAttribute("SERVICE", NameNodeServiceModelGenerator.SERVICE)); + restartEventAttrs.add(createEventAttribute("COMMAND", "Restart")); + restartEventAttrs.add(createEventAttribute("COMMAND_STATUS", "SUCCEEDED")); ApiEvent restartEvent = createApiEvent(ApiEventCategory.AUDIT_EVENT, restartEventAttrs); pca.addRestartEvent(clusterName, restartEvent); + // Simulate a service Start event + List<ApiEventAttribute> startEventAttrs = new ArrayList<>(); + startEventAttrs.add(createEventAttribute("CLUSTER", clusterName)); + startEventAttrs.add(createEventAttribute("SERVICE_TYPE", ClouderaManagerAPIServiceModelGenerator.SERVICE_TYPE)); + startEventAttrs.add(createEventAttribute("SERVICE", ClouderaManagerAPIServiceModelGenerator.SERVICE)); + startEventAttrs.add(createEventAttribute("COMMAND", "Start")); + startEventAttrs.add(createEventAttribute("COMMAND_STATUS", "STARTED")); + ApiEvent startEvent = createApiEvent(ApiEventCategory.AUDIT_EVENT, startEventAttrs); + pca.addRestartEvent(clusterName, startEvent); + + // Simulate a failed service Start event + startEventAttrs = new ArrayList<>(); + startEventAttrs.add(createEventAttribute("CLUSTER", clusterName)); + startEventAttrs.add(createEventAttribute("SERVICE_TYPE", HiveOnTezServiceModelGenerator.SERVICE_TYPE)); + startEventAttrs.add(createEventAttribute("SERVICE", HiveOnTezServiceModelGenerator.SERVICE)); + startEventAttrs.add(createEventAttribute("COMMAND", "Start")); + startEventAttrs.add(createEventAttribute("COMMAND_STATUS", "FAILED")); + ApiEvent failedStartEvent = createApiEvent(ApiEventCategory.AUDIT_EVENT, startEventAttrs); + pca.addRestartEvent(clusterName, failedStartEvent); + try { - pollingThreadExecutor.awaitTermination(15, TimeUnit.SECONDS); + pollingThreadExecutor.awaitTermination(10, TimeUnit.SECONDS); } catch (InterruptedException e) { // } @@ -158,6 +182,7 @@ public class PollingConfigurationAnalyzerTest { pca.stop(); assertTrue("Expected a change notification", listener.wasNotified(address, clusterName)); + assertEquals(2, listener.howManyNotifications(address, clusterName)); } @@ -351,7 +376,7 @@ public class PollingConfigurationAnalyzerTest { } @Override - protected List<ApiEvent> queryRestartEvents(ApiClient client, String clusterName, String since) { + protected List<ApiEvent> queryEvents(ApiClient client, String clusterName, String since) { return restartEvents.computeIfAbsent(clusterName, l -> new ArrayList<>()); } @@ -370,15 +395,21 @@ public class PollingConfigurationAnalyzerTest { private static class ChangeListener implements ConfigurationChangeListener { private final Map<String, String> notifications = new HashMap<>(); + private final List<String> events = new ArrayList<>(); @Override public void onConfigurationChange(String source, String clusterName) { notifications.put(source, clusterName); + events.add(source + "+" + clusterName); } boolean wasNotified(final String source, final String clusterName) { return clusterName.equals(notifications.get(source)); } + + int howManyNotifications(final String source, final String clusterName) { + return events.size(); + } } }