This is an automated email from the ASF dual-hosted git repository. smolnar pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/knox.git
The following commit(s) were added to refs/heads/master by this push: new f96b62d KNOX-2351 - Catching any errors while monitoring CM configuration changes (#324) f96b62d is described below commit f96b62d133d300519fc45f295f5e7a2b58922949 Author: Sandor Molnar <smol...@apache.org> AuthorDate: Mon Apr 27 16:58:09 2020 +0200 KNOX-2351 - Catching any errors while monitoring CM configuration changes (#324) --- .../ClouderaManagerServiceDiscoveryMessages.java | 3 + .../cm/monitor/PollingConfigurationAnalyzer.java | 153 +++++++++++---------- 2 files changed, 82 insertions(+), 74 deletions(-) diff --git a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java index 289c752..7f7c644 100644 --- a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java +++ b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java @@ -126,6 +126,9 @@ public interface ClouderaManagerServiceDiscoveryMessages { @Message(level = MessageLevel.DEBUG, text = "Checking {0} @ {1} for configuration changes...") void checkingClusterConfiguration(String clusterName, String discoveryAddress); + @Message(level = MessageLevel.ERROR, text = "Error while monitoring ClouderaManager configuration changes: {0}") + void clouderaManagerConfigurationChangesMonitoringError(@StackTrace(level = MessageLevel.DEBUG) Exception e); + @Message(level = MessageLevel.ERROR, text = "Error getting service configuration details from ClouderaManager: {0}") void clouderaManagerConfigurationAPIError(@StackTrace(level = MessageLevel.DEBUG) ApiException e); diff --git a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java index fb8d73c..4b7935f 100644 --- a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java +++ b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java @@ -162,93 +162,98 @@ public class PollingConfigurationAnalyzer implements Runnable { isActive = true; while (isActive) { - List<String> clustersToStopMonitoring = new ArrayList<>(); + try { + final List<String> clustersToStopMonitoring = new ArrayList<>(); + + for (Map.Entry<String, List<String>> entry : configCache.getClusterNames().entrySet()) { + String address = entry.getKey(); + for (String clusterName : entry.getValue()) { + log.checkingClusterConfiguration(clusterName, address); + + // Check here for existing descriptor references, and add to the removal list if there are not any + if (!clusterReferencesExist(address, clusterName)) { + clustersToStopMonitoring.add(address + FQCN_DELIM + clusterName); + continue; + } - for (Map.Entry<String, List<String>> entry : configCache.getClusterNames().entrySet()) { - String address = entry.getKey(); - for (String clusterName : entry.getValue()) { - log.checkingClusterConfiguration(clusterName, address); + // Configuration changes don't mean anything without corresponding service start/restarts. Therefore, monitor + // start events, and check the configuration only of the restarted service(s) to identify changes + // that should trigger re-discovery. + final List<StartEvent> relevantEvents = getRelevantEvents(address, clusterName); - // Check here for existing descriptor references, and add to the removal list if there are not any - if (!clusterReferencesExist(address, clusterName)) { - clustersToStopMonitoring.add(address + FQCN_DELIM + clusterName); - continue; + // If there are no recent start events, then nothing to do now + if (!relevantEvents.isEmpty()) { + // If a change has occurred, notify the listeners + if (hasConfigChanged(address, clusterName, relevantEvents)) { + notifyChangeListener(address, clusterName); + } + } } + } - // Configuration changes don't mean anything without corresponding service start/restarts. Therefore, monitor - // start events, and check the configuration only of the restarted service(s) to identify changes - // that should trigger re-discovery. - List<StartEvent> relevantEvents = getRelevantEvents(address, clusterName); - - // If there are no recent start events, then nothing to do now - if (!relevantEvents.isEmpty()) { - boolean configHasChanged = false; - - // If there are start events, then check the previously-recorded properties for the same service to - // identify if the configuration has changed - Map<String, ServiceConfigurationModel> serviceConfigurations = - configCache.getClusterServiceConfigurations(address, clusterName); - - // Those services for which a start even has been handled - List<String> handledServiceTypes = new ArrayList<>(); - - for (StartEvent re : relevantEvents) { - String serviceType = re.getServiceType(); - - // Determine if we've already handled a start event for this service type - if (!handledServiceTypes.contains(serviceType)) { - - // Get the previously-recorded configuration - ServiceConfigurationModel serviceConfig = serviceConfigurations.get(re.getServiceType()); - - if (serviceConfig != null) { - // Get the current config for the started service, and compare with the previously-recorded config - ServiceConfigurationModel currentConfig = - getCurrentServiceConfiguration(address, clusterName, re.getService()); - - if (currentConfig != null) { - log.analyzingCurrentServiceConfiguration(re.getService()); - try { - configHasChanged = hasConfigurationChanged(serviceConfig, currentConfig); - } catch (Exception e) { - log.errorAnalyzingCurrentServiceConfiguration(re.getService(), e); - } - } - } else { - // A new service (no prior config) represent a config change, since a descriptor may have referenced - // the "new" service, but discovery had previously not succeeded because the service had not been - // configured (appropriately) at that time. - log.serviceEnabled(re.getService()); - configHasChanged = true; - } - - handledServiceTypes.add(serviceType); - } + // Remove outdated entries from the cache + for (String fqcn : clustersToStopMonitoring) { + String[] parts = fqcn.split(FQCN_DELIM); + stopMonitoring(parts[0], parts[1]); + } + clustersToStopMonitoring.clear(); // reset the removal list - if (configHasChanged) { - break; // No need to continue checking once we've identified one reason to perform discovery again - } - } + waitFor(interval); + } catch (Exception e) { + log.clouderaManagerConfigurationChangesMonitoringError(e); + } + } + + log.stoppedClouderaManagerConfigMonitor(); + } + + private boolean hasConfigChanged(String address, String clusterName, List<StartEvent> relevantEvents) { + // If there are start events, then check the previously-recorded properties for the same service to + // identify if the configuration has changed + final Map<String, ServiceConfigurationModel> serviceConfigurations = configCache.getClusterServiceConfigurations(address, clusterName); + + // Those services for which a start even has been handled + final List<String> handledServiceTypes = new ArrayList<>(); + + boolean configHasChanged = false; + for (StartEvent re : relevantEvents) { + String serviceType = re.getServiceType(); + + // Determine if we've already handled a start event for this service type + if (!handledServiceTypes.contains(serviceType)) { + + // Get the previously-recorded configuration + ServiceConfigurationModel serviceConfig = serviceConfigurations.get(re.getServiceType()); - // If a change has occurred, notify the listeners - if (configHasChanged) { - notifyChangeListener(address, clusterName); + if (serviceConfig != null) { + // Get the current config for the started service, and compare with the previously-recorded config + ServiceConfigurationModel currentConfig = + getCurrentServiceConfiguration(address, clusterName, re.getService()); + + if (currentConfig != null) { + log.analyzingCurrentServiceConfiguration(re.getService()); + try { + configHasChanged = hasConfigurationChanged(serviceConfig, currentConfig); + } catch (Exception e) { + log.errorAnalyzingCurrentServiceConfiguration(re.getService(), e); } } + } else { + // A new service (no prior config) represent a config change, since a descriptor may have referenced + // the "new" service, but discovery had previously not succeeded because the service had not been + // configured (appropriately) at that time. + log.serviceEnabled(re.getService()); + configHasChanged = true; } - } - // Remove outdated entries from the cache - for (String fqcn : clustersToStopMonitoring) { - String[] parts = fqcn.split(FQCN_DELIM); - stopMonitoring(parts[0], parts[1]); + handledServiceTypes.add(serviceType); } - clustersToStopMonitoring.clear(); // reset the removal list - waitFor(interval); + if (configHasChanged) { + break; // No need to continue checking once we've identified one reason to perform discovery again + } } - - log.stoppedClouderaManagerConfigMonitor(); + return configHasChanged; } private TopologyService getTopologyService() {