Repository: ambari Updated Branches: refs/heads/branch-2.1 bed766879 -> ee7c3bdc2 refs/heads/trunk b179d8de9 -> 6bc870f6c
AMBARI-13663. Add support of api operations retry. (mpapirkovskyy) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/6bc870f6 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/6bc870f6 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/6bc870f6 Branch: refs/heads/trunk Commit: 6bc870f6c955aec07a19c1b7d222c99a2701d999 Parents: b179d8d Author: Myroslav Papirkovskyi <mpapyrkovs...@hortonworks.com> Authored: Wed Nov 4 01:54:35 2015 +0200 Committer: Myroslav Papirkovskyi <mpapyrkovs...@hortonworks.com> Committed: Wed Nov 4 02:02:26 2015 +0200 ---------------------------------------------------------------------- .../ambari/server/api/services/BaseService.java | 2 + .../server/configuration/Configuration.java | 43 +++++++++ .../ambari/server/controller/AmbariServer.java | 3 + .../internal/AbstractResourceProvider.java | 34 +++++++- .../org/apache/ambari/server/state/Cluster.java | 5 ++ .../server/state/cluster/ClusterImpl.java | 92 +++++++++++++++++++- .../server/state/cluster/ClustersImpl.java | 3 +- .../apache/ambari/server/utils/RetryHelper.java | 85 ++++++++++++++++++ 8 files changed, 261 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/6bc870f6/ambari-server/src/main/java/org/apache/ambari/server/api/services/BaseService.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/services/BaseService.java b/ambari-server/src/main/java/org/apache/ambari/server/api/services/BaseService.java index 1016ed7..7945599 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/api/services/BaseService.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/api/services/BaseService.java @@ -28,6 +28,7 @@ import org.apache.ambari.server.api.services.serializers.CsvSerializer; import org.apache.ambari.server.api.services.serializers.JsonSerializer; import org.apache.ambari.server.api.services.serializers.ResultSerializer; import org.apache.ambari.server.controller.spi.Resource; +import org.apache.ambari.server.utils.RetryHelper; import org.eclipse.jetty.util.ajax.JSON; import javax.ws.rs.core.HttpHeaders; @@ -117,6 +118,7 @@ public abstract class BaseService { builder.type(mediaType); } + RetryHelper.clearAffectedClusters(); return builder.build(); } http://git-wip-us.apache.org/repos/asf/ambari/blob/6bc870f6/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java index 3a282ed..b4d5de8 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java @@ -202,6 +202,12 @@ public class Configuration { public static final String SERVER_JDBC_CONNECTION_POOL_ACQUISITION_RETRY_ATTEMPTS = "server.jdbc.connection-pool.acquisition-retry-attempts"; public static final String SERVER_JDBC_CONNECTION_POOL_ACQUISITION_RETRY_DELAY = "server.jdbc.connection-pool.acquisition-retry-delay"; + public static final String API_OPERATIONS_RETRY_ATTEMPTS_KEY = "api.operations.retry-attempts"; + public static final String BLUEPRINTS_OPERATIONS_RETRY_ATTEMPTS_KEY = "blueprints.operations.retry-attempts"; + public static final String API_OPERATIONS_RETRY_ATTEMPTS_DEFAULT = "0"; + public static final String BLUEPRINTS_OPERATIONS_RETRY_ATTEMPTS_DEFAULT = "0"; + public static final int RETRY_ATTEMPTS_LIMIT = 10; + public static final String SERVER_JDBC_RCA_USER_NAME_KEY = "server.jdbc.rca.user.name"; public static final String SERVER_JDBC_RCA_USER_PASSWD_KEY = "server.jdbc.rca.user.passwd"; public static final String SERVER_JDBC_RCA_DRIVER_KEY = "server.jdbc.rca.driver"; @@ -2351,4 +2357,41 @@ public class Configuration { public int getAlertCacheSize() { return Integer.parseInt(properties.getProperty(ALERTS_CACHE_SIZE, ALERTS_CACHE_SIZE_DEFAULT)); } + + /** + * @return number of retry attempts for API update requests + */ + public int getApiOperationsRetryAttempts() { + String property = properties.getProperty(API_OPERATIONS_RETRY_ATTEMPTS_KEY, API_OPERATIONS_RETRY_ATTEMPTS_DEFAULT); + Integer attempts = Integer.valueOf(property); + if (attempts < 0) { + LOG.warn("Invalid API retry attempts number ({}), should be [0,{}]. Value reset to default {}", + attempts, RETRY_ATTEMPTS_LIMIT, API_OPERATIONS_RETRY_ATTEMPTS_DEFAULT); + attempts = Integer.valueOf(API_OPERATIONS_RETRY_ATTEMPTS_DEFAULT); + } else if (attempts > RETRY_ATTEMPTS_LIMIT) { + LOG.warn("Invalid API retry attempts number ({}), should be [0,{}]. Value set to {}", + attempts, RETRY_ATTEMPTS_LIMIT, RETRY_ATTEMPTS_LIMIT); + attempts = RETRY_ATTEMPTS_LIMIT; + } + return attempts; + } + + /** + * @return number of retry attempts for blueprints operations + */ + public int getBlueprintsOperationsRetryAttempts() { + String property = properties.getProperty(BLUEPRINTS_OPERATIONS_RETRY_ATTEMPTS_KEY, + BLUEPRINTS_OPERATIONS_RETRY_ATTEMPTS_DEFAULT); + Integer attempts = Integer.valueOf(property); + if (attempts < 0) { + LOG.warn("Invalid blueprint operations retry attempts number ({}), should be [0,{}]. Value reset to default {}", + attempts, RETRY_ATTEMPTS_LIMIT, BLUEPRINTS_OPERATIONS_RETRY_ATTEMPTS_DEFAULT); + attempts = Integer.valueOf(BLUEPRINTS_OPERATIONS_RETRY_ATTEMPTS_DEFAULT); + } else if (attempts > RETRY_ATTEMPTS_LIMIT) { + LOG.warn("Invalid blueprint operations retry attempts number ({}), should be [0,{}]. Value set to {}", + attempts, RETRY_ATTEMPTS_LIMIT, RETRY_ATTEMPTS_LIMIT); + attempts = RETRY_ATTEMPTS_LIMIT; + } + return attempts; + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/6bc870f6/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java index 177bbc8..ea178b5 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java @@ -99,6 +99,7 @@ import org.apache.ambari.server.topology.BlueprintFactory; import org.apache.ambari.server.topology.SecurityConfigurationFactory; import org.apache.ambari.server.topology.TopologyManager; import org.apache.ambari.server.topology.TopologyRequestFactoryImpl; +import org.apache.ambari.server.utils.RetryHelper; import org.apache.ambari.server.utils.StageUtils; import org.apache.ambari.server.view.ViewRegistry; import org.apache.velocity.app.Velocity; @@ -708,6 +709,8 @@ public class AmbariServer { ActionManager.setTopologyManager(injector.getInstance(TopologyManager.class)); TopologyManager.init(injector.getInstance(StackAdvisorBlueprintProcessor.class)); StackAdvisorBlueprintProcessor.init(injector.getInstance(StackAdvisorHelper.class)); + + RetryHelper.init(configs.getApiOperationsRetryAttempts(), configs.getBlueprintsOperationsRetryAttempts()); } /** http://git-wip-us.apache.org/repos/asf/ambari/blob/6bc870f6/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractResourceProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractResourceProvider.java index 0c6a07c..2bf8fe3 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractResourceProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractResourceProvider.java @@ -38,6 +38,7 @@ import org.apache.ambari.server.controller.predicate.EqualsPredicate; import org.apache.ambari.server.controller.spi.*; import org.apache.ambari.server.controller.utilities.PredicateHelper; import org.apache.ambari.server.controller.utilities.PropertyHelper; +import org.apache.ambari.server.utils.RetryHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -269,7 +270,7 @@ public abstract class AbstractResourceProvider extends BaseProvider implements R protected <T> T createResources(Command<T> command) throws SystemException, ResourceAlreadyExistsException, NoSuchParentResourceException { try { - return command.invoke(); + return invokeWithRetry(command); } catch (ParentObjectNotFoundException e) { throw new NoSuchParentResourceException(e.getMessage(), e); } catch (DuplicateResourceException e) { @@ -327,7 +328,7 @@ public abstract class AbstractResourceProvider extends BaseProvider implements R protected <T> T modifyResources (Command<T> command) throws SystemException, NoSuchResourceException, NoSuchParentResourceException { try { - return command.invoke(); + return invokeWithRetry(command); } catch (ParentObjectNotFoundException e) { throw new NoSuchParentResourceException(e.getMessage(), e); } catch (ObjectNotFoundException e) { @@ -439,6 +440,35 @@ public abstract class AbstractResourceProvider extends BaseProvider implements R return predicates.size() == 1 && PredicateHelper.getPropertyIds(predicate).containsAll(getPKPropertyIds()); } + //invoke command with retry support in case of database fail + private <T> T invokeWithRetry(Command<T> command) throws AmbariException { + RetryHelper.clearAffectedClusters(); + int retryAttempts = RetryHelper.getApiOperationsRetryAttempts(); + do { + + try { + return command.invoke(); + } catch (Exception e) { + if (RetryHelper.isDatabaseException(e)) { + + RetryHelper.invalidateAffectedClusters(); + + if (retryAttempts > 0) { + LOG.error("Ignoring database exception to perform operation retry, attempts remaining: " + retryAttempts, e); + retryAttempts--; + } else { + RetryHelper.clearAffectedClusters(); + throw e; + } + } else { + RetryHelper.clearAffectedClusters(); + throw e; + } + } + + } while (true); + } + // ----- Inner interface --------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/6bc870f6/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java index f32e552..2afba7e 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java @@ -608,4 +608,9 @@ public interface Cluster { * {@code null}). */ void removeConfigurations(StackId stackId); + + /** + * Clear cluster caches and re-read data from database + */ + void invalidateData(); } http://git-wip-us.apache.org/repos/asf/ambari/blob/6bc870f6/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java index ef225b0..6af6a82 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java @@ -145,12 +145,14 @@ public class ClusterImpl implements Cluster { private StackId desiredStackVersion; + private volatile boolean desiredStackVersionSet = true; + private volatile Map<String, Service> services = null; /** * [ Config Type -> [ Config Version Tag -> Config ] ] */ - private Map<String, Map<String, Config>> allConfigs; + private volatile Map<String, Map<String, Config>> allConfigs; /** * [ ServiceName -> [ ServiceComponentName -> [ HostName -> [ ... ] ] ] ] @@ -918,6 +920,7 @@ public class ClusterImpl implements Cluster { @Override public StackId getDesiredStackVersion() { + loadStackVersion(); clusterGlobalLock.readLock().lock(); try { return desiredStackVersion; @@ -1756,6 +1759,7 @@ public class ClusterImpl implements Cluster { @Override public Map<String, Config> getConfigsByType(String configType) { + loadConfigurations(); clusterGlobalLock.readLock().lock(); try { if (!allConfigs.containsKey(configType)) { @@ -1770,6 +1774,7 @@ public class ClusterImpl implements Cluster { @Override public Config getConfig(String configType, String versionTag) { + loadConfigurations(); clusterGlobalLock.readLock().lock(); try { if (!allConfigs.containsKey(configType) @@ -1801,6 +1806,7 @@ public class ClusterImpl implements Cluster { @Override public void addConfig(Config config) { + loadConfigurations(); clusterGlobalLock.writeLock().lock(); try { if (config.getType() == null || config.getType().isEmpty()) { @@ -1818,6 +1824,7 @@ public class ClusterImpl implements Cluster { @Override public Collection<Config> getAllConfigs() { + loadConfigurations(); clusterGlobalLock.readLock().lock(); try { List<Config> list = new ArrayList<Config>(); @@ -1835,6 +1842,7 @@ public class ClusterImpl implements Cluster { @Override public ClusterResponse convertToResponse() throws AmbariException { + loadStackVersion(); String clusterName = getClusterName(); Map<String, Host> hosts = clusters.getHostsForCluster(clusterName); clusterGlobalLock.readLock().lock(); @@ -1851,6 +1859,7 @@ public class ClusterImpl implements Cluster { @Override public void debugDump(StringBuilder sb) { loadServices(); + loadStackVersion(); clusterGlobalLock.readLock().lock(); try { sb.append("Cluster={ clusterName=").append(getClusterName()).append( @@ -2022,6 +2031,7 @@ public class ClusterImpl implements Cluster { @Override public Map<String, DesiredConfig> getDesiredConfigs() { + loadConfigurations(); clusterGlobalLock.readLock().lock(); try { Map<String, DesiredConfig> map = new HashMap<String, DesiredConfig>(); @@ -2205,8 +2215,8 @@ public class ClusterImpl implements Cluster { } @Override - @RequiresSession public List<ServiceConfigVersionResponse> getServiceConfigVersions() { + loadConfigurations(); clusterGlobalLock.readLock().lock(); try { List<ServiceConfigVersionResponse> serviceConfigVersionResponses = new ArrayList<ServiceConfigVersionResponse>(); @@ -2468,6 +2478,7 @@ public class ClusterImpl implements Cluster { @Override public Config getDesiredConfigByType(String configType) { + loadConfigurations(); clusterGlobalLock.readLock().lock(); try { for (ClusterConfigMappingEntity e : clusterEntity.getConfigMappingEntities()) { @@ -2873,7 +2884,6 @@ public class ClusterImpl implements Cluster { * {@inheritDoc} */ @Override - @Transactional public void applyLatestConfigurations(StackId stackId) { clusterGlobalLock.writeLock().lock(); try { @@ -3036,4 +3046,80 @@ public class ClusterImpl implements Cluster { } } } + + private void loadConfigurations() { + if (allConfigs != null) { + return; + } + clusterGlobalLock.writeLock().lock(); + try { + if (allConfigs != null) { + return; + } + cacheConfigurations(); + + } finally { + clusterGlobalLock.writeLock().unlock(); + } + } + + private void loadStackVersion() { + if (desiredStackVersionSet) { + return; + } + clusterGlobalLock.writeLock().lock(); + try { + + if (desiredStackVersionSet) { + return; + } + + desiredStackVersion = new StackId(clusterEntity.getDesiredStack()); + + if (!StringUtils.isEmpty(desiredStackVersion.getStackName()) && ! + StringUtils.isEmpty(desiredStackVersion.getStackVersion())) { + try { + loadServiceConfigTypes(); + } catch (AmbariException e) { + //TODO recheck wrapping exception here, required for lazy loading after invalidation + throw new RuntimeException(e); + } + } + + desiredStackVersionSet = true; + + } finally { + clusterGlobalLock.writeLock().unlock(); + } + + } + + /** + * Purpose of this method is to clear all cached data to re-read it from database. + * To be used in case of desync. + */ + @Override + public void invalidateData() { + clusterGlobalLock.writeLock().lock(); + try { + allConfigs = null; + services = null; + desiredStackVersionSet = false; + + serviceComponentHosts.clear(); + serviceComponentHostsByHost.clear(); + svcHostsLoaded = false; + + clusterConfigGroups = null; + + //TODO investigate reset request executions, it has separate api which is not too heavy + + refresh(); + + } finally { + clusterGlobalLock.writeLock().unlock(); + } + } } + + http://git-wip-us.apache.org/repos/asf/ambari/blob/6bc870f6/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java index a89fb91..9ea9581 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java @@ -66,6 +66,7 @@ import org.apache.ambari.server.state.SecurityType; import org.apache.ambari.server.state.StackId; import org.apache.ambari.server.state.configgroup.ConfigGroup; import org.apache.ambari.server.state.host.HostFactory; +import org.apache.ambari.server.utils.RetryHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.security.core.GrantedAuthority; @@ -272,7 +273,7 @@ public class ClustersImpl implements Clusters { if (null == cluster) { throw new ClusterNotFoundException(clusterName); } - + RetryHelper.addAffectedCluster(cluster); return cluster; } http://git-wip-us.apache.org/repos/asf/ambari/blob/6bc870f6/ambari-server/src/main/java/org/apache/ambari/server/utils/RetryHelper.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/utils/RetryHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/utils/RetryHelper.java new file mode 100644 index 0000000..9d2fe9e --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/utils/RetryHelper.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.utils; + +import org.apache.ambari.server.configuration.Configuration; +import org.apache.ambari.server.state.Cluster; +import org.eclipse.persistence.exceptions.DatabaseException; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Provides utility methods to support operations retry + * TODO injection as Guice singleon, static for now to avoid major modifications + */ +public class RetryHelper { + private static ThreadLocal<Set<Cluster>> affectedClusters = new ThreadLocal<Set<Cluster>>(){ + @Override + protected Set<Cluster> initialValue() { + return new HashSet<>(); + } + }; + + private static int apiRetryAttempts = 0; + private static int blueprintsRetryAttempts = 0; + + public static void init(int apiOperationsRetryAttempts, int blueprintOperationsRetryAttempts) { + apiRetryAttempts = apiOperationsRetryAttempts; + blueprintsRetryAttempts = blueprintOperationsRetryAttempts; + } + + public static void addAffectedCluster(Cluster cluster) { + if (apiRetryAttempts > 0 || blueprintsRetryAttempts > 0) { + affectedClusters.get().add(cluster); + } + } + + public static Set<Cluster> getAffectedClusters() { + return Collections.unmodifiableSet(affectedClusters.get()); + } + + public static void clearAffectedClusters() { + if (apiRetryAttempts > 0 || blueprintsRetryAttempts > 0) { + affectedClusters.get().clear(); + } + } + + public static int getApiOperationsRetryAttempts() { + return apiRetryAttempts; + } + + public static boolean isDatabaseException(Throwable ex) { + do { + if (ex instanceof DatabaseException) { + return true; + } + ex = ex.getCause(); + + } while (ex != null); + + return false; + } + + public static void invalidateAffectedClusters() { + for (Cluster cluster : affectedClusters.get()) { + cluster.invalidateData(); + } + } +}