Repository: ambari
Updated Branches:
  refs/heads/trunk 91eb6961e -> c254db4b3


Ambari-10606. Ambari Agent needs to retry failed install/start operations


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/c254db4b
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/c254db4b
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/c254db4b

Branch: refs/heads/trunk
Commit: c254db4b3592e910599ce25c7add8db2650ccfbb
Parents: 91eb696
Author: Sumit Mohanty <smoha...@hortonworks.com>
Authored: Tue Apr 21 13:29:52 2015 -0700
Committer: Sumit Mohanty <smoha...@hortonworks.com>
Committed: Tue Apr 21 13:29:52 2015 -0700

----------------------------------------------------------------------
 .../src/main/python/ambari_agent/ActionQueue.py |  49 +++++--
 .../ambari_agent/CustomServiceOrchestrator.py   |   8 +-
 .../test/python/ambari_agent/TestActionQueue.py | 128 ++++++++++++++++++-
 .../TestCustomServiceOrchestrator.py            |  55 ++++++++
 .../ambari/server/agent/ExecutionCommand.java   |   4 +-
 .../server/configuration/Configuration.java     |  14 ++
 .../AmbariCustomCommandExecutionHelper.java     |  30 +++--
 .../AmbariManagementControllerImpl.java         |   4 +
 8 files changed, 266 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/c254db4b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py 
b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index c4e2c33..212226c 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -25,6 +25,8 @@ import threading
 import pprint
 import os
 import json
+from random import randint
+import time
 
 from AgentException import AgentException
 from LiveStatus import LiveStatus
@@ -223,16 +225,40 @@ class ActionQueue(threading.Thread):
 
     self.commandStatuses.put_command_status(command, in_progress_status)
 
-    # running command
-    commandresult = self.customServiceOrchestrator.runCommand(command,
-      in_progress_status['tmpout'], in_progress_status['tmperr'])
+    numAttempts = 0
+    maxAttempts = 1
+    retryAble = False
+    delay = 1
+    if 'commandParams' in command:
+      if 'command_retry_max_attempt_count' in command['commandParams']:
+        maxAttempts = 
int(command['commandParams']['command_retry_max_attempt_count'])
+      if 'command_retry_enabled' in command['commandParams']:
+        retryAble = command['commandParams']['command_retry_enabled'] == "true"
+
+    logger.debug("Command execution metadata - retry enabled = {retryAble}, 
max attempt count = {maxAttemptCount}".
+                 format(retryAble = retryAble, maxAttemptCount = maxAttempts))
+    while numAttempts < maxAttempts:
+      numAttempts += 1
+      # running command
+      commandresult = self.customServiceOrchestrator.runCommand(command,
+        in_progress_status['tmpout'], in_progress_status['tmperr'],
+        override_output_files=numAttempts == 1, retry=numAttempts > 1)
+
+
+      # dumping results
+      if isCommandBackground:
+        return
+      else:
+        status = self.COMPLETED_STATUS if commandresult['exitcode'] == 0 else 
self.FAILED_STATUS
 
+      if status != self.COMPLETED_STATUS and retryAble == True and maxAttempts 
> numAttempts:
+        delay = self.get_retry_delay(delay)
+        logger.info("Retrying command id {cid} after a wait of 
{delay}".format(cid = taskId, delay=delay))
+        time.sleep(delay)
+        continue
+      else:
+        break
 
-    # dumping results
-    if isCommandBackground:
-      return
-    else:
-      status = self.COMPLETED_STATUS if commandresult['exitcode'] == 0 else 
self.FAILED_STATUS
     roleResult = self.commandStatuses.generate_report_template(command)
     roleResult.update({
       'stdout': commandresult['stdout'],
@@ -289,6 +315,13 @@ class ActionQueue(threading.Thread):
 
     self.commandStatuses.put_command_status(command, roleResult)
 
+  def get_retry_delay(self, last_delay):
+    """
+    Returns exponentially growing delay. The idea being if number of retries 
is high then the reason to retry
+    is probably a host or environment specific issue requiring longer waits
+    """
+    return last_delay * 2
+
   def command_was_canceled(self):
     self.customServiceOrchestrator
   def on_background_command_complete_callback(self, process_condenced_result, 
handle):

http://git-wip-us.apache.org/repos/asf/ambari/blob/c254db4b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py 
b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
index 72fb0af..54738a6 100644
--- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
+++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
@@ -97,7 +97,7 @@ class CustomServiceOrchestrator():
         logger.warn("Unable to find pid by taskId = %s" % task_id)
 
   def runCommand(self, command, tmpoutfile, tmperrfile, 
forced_command_name=None,
-                 override_output_files = True):
+                 override_output_files=True, retry=False):
     """
     forced_command_name may be specified manually. In this case, value, 
defined at
     command json, is ignored.
@@ -155,7 +155,7 @@ class CustomServiceOrchestrator():
         handle.on_background_command_started = self.map_task_to_process
         del command['__handle']
 
-      json_path = self.dump_command_to_json(command)
+      json_path = self.dump_command_to_json(command, retry)
       pre_hook_tuple = self.resolve_hook_script_path(hook_dir,
           self.PRE_HOOK_PREFIX, command_name, script_type)
       post_hook_tuple = self.resolve_hook_script_path(hook_dir,
@@ -293,7 +293,7 @@ class CustomServiceOrchestrator():
     return hook_script_path, hook_base_dir
 
 
-  def dump_command_to_json(self, command):
+  def dump_command_to_json(self, command, retry=False):
     """
     Converts command to json file and returns file path
     """
@@ -311,7 +311,7 @@ class CustomServiceOrchestrator():
       file_path = os.path.join(self.tmp_dir, "status_command.json")
     else:
       task_id = command['taskId']
-      if 'clusterHostInfo' in command and command['clusterHostInfo']:
+      if 'clusterHostInfo' in command and command['clusterHostInfo'] and not 
retry:
         command['clusterHostInfo'] = 
self.decompressClusterHostInfo(command['clusterHostInfo'])
       file_path = os.path.join(self.tmp_dir, 
"command-{0}.json".format(task_id))
       if command_type == ActionQueue.AUTO_EXECUTION_COMMAND:

http://git-wip-us.apache.org/repos/asf/ambari/blob/c254db4b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py 
b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
index b9cbbe0..f43d3f7 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
@@ -163,6 +163,28 @@ class TestActionQueue(TestCase):
     'hostLevelParams': {}
   }
 
+  retryable_command = {
+    'commandType': 'EXECUTION_COMMAND',
+    'role': 'NAMENODE',
+    'roleCommand': 'INSTALL',
+    'commandId': '1-1',
+    'taskId': 19,
+    'clusterName': 'c1',
+    'serviceName': 'HDFS',
+    'configurations':{'global' : {}},
+    'configurationTags':{'global' : { 'tag': 'v123' }},
+    'commandParams' :  {
+      'script_type' : 'PYTHON',
+      'script' : 'script.py',
+      'command_timeout' : '600',
+      'jdk_location' : '.',
+      'service_package_folder' : '.',
+      'command_retry_enabled' : 'true',
+      'command_retry_max_attempt_count' : '3'
+    },
+    'hostLevelParams' : {}
+  }
+
   background_command = {
     'commandType': 'BACKGROUND_EXECUTION_COMMAND',
     'role': 'NAMENODE',
@@ -310,7 +332,8 @@ class TestActionQueue(TestCase):
       'stderr': 'stderr',
       'structuredOut' : ''
       }
-    def side_effect(command, tmpoutfile, tmperrfile):
+
+    def side_effect(command, tmpoutfile, tmperrfile, 
override_output_files=True, retry=False):
       unfreeze_flag.wait()
       return python_execution_result_dict
     def patched_aq_execute_command(command):
@@ -635,6 +658,109 @@ class TestActionQueue(TestCase):
     actionQueue.join()
     self.assertEqual(actionQueue.stopped(), True, 'Action queue is not 
stopped.')
 
+
+  @patch("time.sleep")
+  @patch.object(OSCheck, "os_distribution", 
new=MagicMock(return_value=os_distro_value))
+  @patch.object(StackVersionsFileHandler, "read_stack_version")
+  @patch.object(CustomServiceOrchestrator, "__init__")
+  def test_execute_retryable_command(self, CustomServiceOrchestrator_mock,
+                                     read_stack_version_mock, sleep_mock
+  ):
+    CustomServiceOrchestrator_mock.return_value = None
+    dummy_controller = MagicMock()
+    actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
+    python_execution_result_dict = {
+      'exitcode': 1,
+      'stdout': 'out',
+      'stderr': 'stderr',
+      'structuredOut': '',
+      'status': 'FAILED'
+    }
+
+    def side_effect(command, tmpoutfile, tmperrfile, 
override_output_files=True, retry=False):
+      return python_execution_result_dict
+
+    command = copy.deepcopy(self.retryable_command)
+    with patch.object(CustomServiceOrchestrator, "runCommand") as 
runCommand_mock:
+      runCommand_mock.side_effect = side_effect
+      actionQueue.execute_command(command)
+
+    #assert that python executor start
+    self.assertTrue(runCommand_mock.called)
+    self.assertEqual(3, runCommand_mock.call_count)
+    self.assertEqual(2, sleep_mock.call_count)
+    sleep_mock.assert_has_calls([call(2), call(4)], False)
+    runCommand_mock.assert_has_calls([
+      call(command, '/tmp/ambari-agent/output-19.txt', 
'/tmp/ambari-agent/errors-19.txt', override_output_files=True, retry=False),
+      call(command, '/tmp/ambari-agent/output-19.txt', 
'/tmp/ambari-agent/errors-19.txt', override_output_files=False, retry=True),
+      call(command, '/tmp/ambari-agent/output-19.txt', 
'/tmp/ambari-agent/errors-19.txt', override_output_files=False, retry=True)])
+
+
+  #retryable_command
+  @patch("time.sleep")
+  @patch.object(OSCheck, "os_distribution", 
new=MagicMock(return_value=os_distro_value))
+  @patch.object(StackVersionsFileHandler, "read_stack_version")
+  @patch.object(CustomServiceOrchestrator, "__init__")
+  def test_execute_retryable_command_fail_and_succeed(self, 
CustomServiceOrchestrator_mock,
+                                                      read_stack_version_mock, 
sleep_mock
+  ):
+    CustomServiceOrchestrator_mock.return_value = None
+    dummy_controller = MagicMock()
+    actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
+    execution_result_fail_dict = {
+      'exitcode': 1,
+      'stdout': 'out',
+      'stderr': 'stderr',
+      'structuredOut': '',
+      'status': 'FAILED'
+    }
+    execution_result_succ_dict = {
+      'exitcode': 0,
+      'stdout': 'out',
+      'stderr': 'stderr',
+      'structuredOut': '',
+      'status': 'COMPLETED'
+    }
+
+    command = copy.deepcopy(self.retryable_command)
+    with patch.object(CustomServiceOrchestrator, "runCommand") as 
runCommand_mock:
+      runCommand_mock.side_effect = [execution_result_fail_dict, 
execution_result_succ_dict]
+      actionQueue.execute_command(command)
+
+    #assert that python executor start
+    self.assertTrue(runCommand_mock.called)
+    self.assertEqual(2, runCommand_mock.call_count)
+    self.assertEqual(1, sleep_mock.call_count)
+    sleep_mock.assert_any_call(2)
+
+  @patch("time.sleep")
+  @patch.object(OSCheck, "os_distribution", 
new=MagicMock(return_value=os_distro_value))
+  @patch.object(StackVersionsFileHandler, "read_stack_version")
+  @patch.object(CustomServiceOrchestrator, "__init__")
+  def test_execute_retryable_command_succeed(self, 
CustomServiceOrchestrator_mock,
+                                             read_stack_version_mock, 
sleep_mock
+  ):
+    CustomServiceOrchestrator_mock.return_value = None
+    dummy_controller = MagicMock()
+    actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
+    execution_result_succ_dict = {
+      'exitcode': 0,
+      'stdout': 'out',
+      'stderr': 'stderr',
+      'structuredOut': '',
+      'status': 'COMPLETED'
+    }
+
+    command = copy.deepcopy(self.retryable_command)
+    with patch.object(CustomServiceOrchestrator, "runCommand") as 
runCommand_mock:
+      runCommand_mock.side_effect = [execution_result_succ_dict]
+      actionQueue.execute_command(command)
+
+    #assert that python executor start
+    self.assertTrue(runCommand_mock.called)
+    self.assertFalse(sleep_mock.called)
+    self.assertEqual(1, runCommand_mock.call_count)
+
   @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = 
os_distro_value))
   @patch.object(StackVersionsFileHandler, "read_stack_version")
   @patch.object(CustomServiceOrchestrator, "runCommand")

http://git-wip-us.apache.org/repos/asf/ambari/blob/c254db4b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py 
b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
index 811cf26..a9e604d 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
@@ -144,6 +144,61 @@ class TestCustomServiceOrchestrator(TestCase):
 
 
   @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = 
os_distro_value))
+  @patch("hostname.public_hostname")
+  @patch("os.path.isfile")
+  @patch("os.unlink")
+  @patch.object(FileCache, "__init__")
+  def test_dump_command_to_json_with_retry(self, FileCache_mock, unlink_mock,
+                                isfile_mock, hostname_mock):
+    FileCache_mock.return_value = None
+    hostname_mock.return_value = "test.hst"
+    command = {
+      'commandType': 'EXECUTION_COMMAND',
+      'role': u'DATANODE',
+      'roleCommand': u'INSTALL',
+      'commandId': '1-1',
+      'taskId': 3,
+      'clusterName': u'cc',
+      'serviceName': u'HDFS',
+      'configurations':{'global' : {}},
+      'configurationTags':{'global' : { 'tag': 'v1' }},
+      'clusterHostInfo':{'namenode_host' : ['1'],
+                         'slave_hosts'   : ['0', '1'],
+                         'all_racks'   : [u'/default-rack:0'],
+                         'ambari_server_host' : 'a.b.c',
+                         'all_ipv4_ips'   : [u'192.168.12.101:0'],
+                         'all_hosts'     : ['h1.hortonworks.com', 
'h2.hortonworks.com'],
+                         'all_ping_ports': ['8670:0,1']},
+      'hostLevelParams':{}
+    }
+
+    config = AmbariConfig().getConfig()
+    tempdir = tempfile.gettempdir()
+    config.set('agent', 'prefix', tempdir)
+    dummy_controller = MagicMock()
+    orchestrator = CustomServiceOrchestrator(config, dummy_controller)
+    isfile_mock.return_value = True
+    # Test dumping EXECUTION_COMMAND
+    json_file = orchestrator.dump_command_to_json(command)
+    self.assertTrue(os.path.exists(json_file))
+    self.assertTrue(os.path.getsize(json_file) > 0)
+    if get_platform() != PLATFORM_WINDOWS:
+      self.assertEqual(oct(os.stat(json_file).st_mode & 0777), '0600')
+    self.assertTrue(json_file.endswith("command-3.json"))
+    os.unlink(json_file)
+    # Test dumping STATUS_COMMAND
+    json_file = orchestrator.dump_command_to_json(command, True)
+    self.assertTrue(os.path.exists(json_file))
+    self.assertTrue(os.path.getsize(json_file) > 0)
+    if get_platform() != PLATFORM_WINDOWS:
+      self.assertEqual(oct(os.stat(json_file).st_mode & 0777), '0600')
+    self.assertTrue(json_file.endswith("command-3.json"))
+    os.unlink(json_file)
+    # Testing side effect of dump_command_to_json
+    self.assertEquals(command['public_hostname'], "test.hst")
+    self.assertTrue(unlink_mock.called)
+
+  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = 
os_distro_value))
   @patch("os.path.exists")
   @patch.object(FileCache, "__init__")
   def test_resolve_script_path(self, FileCache_mock, exists_mock):

http://git-wip-us.apache.org/repos/asf/ambari/blob/c254db4b/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java
index 7b9709e..7f588fe 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java
@@ -306,8 +306,10 @@ public class ExecutionCommand extends AgentCommand {
     String GROUP_LIST = "group_list";
     String VERSION = "version";
     String REFRESH_TOPOLOGY = "refresh_topology";
+    String COMMAND_RETRY_MAX_ATTEMPT_COUNT = "command_retry_max_attempt_count";
+    String COMMAND_RETRY_ENABLED = "command_retry_enabled";
 
-    String SERVICE_CHECK = "SERVICE_CHECK"; // TODO: is it standart command? 
maybe add it to RoleCommand enum?
+    String SERVICE_CHECK = "SERVICE_CHECK"; // TODO: is it standard command? 
maybe add it to RoleCommand enum?
     String CUSTOM_COMMAND = "custom_command";
   }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/c254db4b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
index 2ff3a74..cd2bafd 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
@@ -349,6 +349,10 @@ public class Configuration {
   private static final String DEFAULT_JDBC_POOL_MAX_AGE_SECONDS = "0";
   private static final String DEFAULT_JDBC_POOL_IDLE_TEST_INTERVAL = "7200";
 
+  private static final String IS_COMMAND_RETRY_ENABLED_KEY = 
"command.retry.enabled";
+  private static final String IS_COMMAND_RETRY_ENABLED_DEFAULT = "false";
+  private static final String COMMAND_RETRY_COUNT_KEY = "command.retry.count";
+  private static final String COMMAND_RETRY_COUNT_DEFAULT = "3";
   /**
    * The full path to the XML file that describes the different alert 
templates.
    */
@@ -1129,6 +1133,16 @@ public class Configuration {
   }
 
   /**
+   * Command retry configs
+   */
+  public boolean isCommandRetryEnabled() {
+    return 
Boolean.parseBoolean(properties.getProperty(IS_COMMAND_RETRY_ENABLED_KEY, 
IS_COMMAND_RETRY_ENABLED_DEFAULT));
+  }
+
+  public int commandRetryCount() {
+    return Integer.parseInt(properties.getProperty(COMMAND_RETRY_COUNT_KEY, 
COMMAND_RETRY_COUNT_DEFAULT));
+  }
+  /**
    * @return custom properties for database connections
    */
   public Map<String,String> getDatabaseCustomProperties() {

http://git-wip-us.apache.org/repos/asf/ambari/blob/c254db4b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
index 50797a4..f585e28 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
@@ -19,6 +19,8 @@
 package org.apache.ambari.server.controller;
 
 import static 
org.apache.ambari.server.agent.ExecutionCommand.KeyNames.CLIENTS_TO_UPDATE_CONFIGS;
+import static 
org.apache.ambari.server.agent.ExecutionCommand.KeyNames.COMMAND_RETRY_ENABLED;
+import static 
org.apache.ambari.server.agent.ExecutionCommand.KeyNames.COMMAND_RETRY_MAX_ATTEMPT_COUNT;
 import static 
org.apache.ambari.server.agent.ExecutionCommand.KeyNames.COMMAND_TIMEOUT;
 import static 
org.apache.ambari.server.agent.ExecutionCommand.KeyNames.COMPONENT_CATEGORY;
 import static 
org.apache.ambari.server.agent.ExecutionCommand.KeyNames.CUSTOM_COMMAND;
@@ -175,8 +177,8 @@ public class AmbariCustomCommandExecutionHelper {
       return false;
     }
     ComponentInfo componentInfo = ambariMetaInfo.getComponent(
-      stackId.getStackName(), stackId.getStackVersion(),
-      serviceName, componentName);
+        stackId.getStackName(), stackId.getStackVersion(),
+        serviceName, componentName);
 
     return !(!componentInfo.isCustomCommand(commandName) &&
       !actionMetadata.isDefaultHostComponentCommand(commandName));
@@ -247,15 +249,15 @@ public class AmbariCustomCommandExecutionHelper {
     Set<String> candidateHosts = new 
HashSet<String>(resourceFilter.getHostNames());
     // Filter hosts that are in MS
     Set<String> ignoredHosts = 
maintenanceStateHelper.filterHostsInMaintenanceState(
-            candidateHosts, new MaintenanceStateHelper.HostPredicate() {
-              @Override
-              public boolean shouldHostBeRemoved(final String hostname)
-                      throws AmbariException {
-                return !maintenanceStateHelper.isOperationAllowed(
-                        cluster, actionExecutionContext.getOperationLevel(),
-                        resourceFilter, serviceName, componentName, hostname);
-              }
-            }
+        candidateHosts, new MaintenanceStateHelper.HostPredicate() {
+          @Override
+          public boolean shouldHostBeRemoved(final String hostname)
+              throws AmbariException {
+            return !maintenanceStateHelper.isOperationAllowed(
+                cluster, actionExecutionContext.getOperationLevel(),
+                resourceFilter, serviceName, componentName, hostname);
+          }
+        }
     );
 
     // Filter unhealthy hosts
@@ -279,7 +281,7 @@ public class AmbariCustomCommandExecutionHelper {
     StackId stackId = cluster.getDesiredStackVersion();
     AmbariMetaInfo ambariMetaInfo = managementController.getAmbariMetaInfo();
     ServiceInfo serviceInfo = ambariMetaInfo.getService(
-       stackId.getStackName(), stackId.getStackVersion(), serviceName);
+        stackId.getStackName(), stackId.getStackVersion(), serviceName);
     StackInfo stackInfo = ambariMetaInfo.getStack
        (stackId.getStackName(), stackId.getStackVersion());
 
@@ -367,6 +369,8 @@ public class AmbariCustomCommandExecutionHelper {
         if (script != null) {
           commandParams.put(SCRIPT, script.getScript());
           commandParams.put(SCRIPT_TYPE, script.getScriptType().toString());
+          commandParams.put(COMMAND_RETRY_MAX_ATTEMPT_COUNT, 
Integer.toString(configs.commandRetryCount()));
+          commandParams.put(COMMAND_RETRY_ENABLED, 
Boolean.toString(configs.isCommandRetryEnabled()));
           if (script.getTimeout() > 0) {
             commandTimeout = String.valueOf(script.getTimeout());
           }
@@ -564,6 +568,8 @@ public class AmbariCustomCommandExecutionHelper {
       if (script != null) {
         commandParams.put(SCRIPT, script.getScript());
         commandParams.put(SCRIPT_TYPE, script.getScriptType().toString());
+        commandParams.put(COMMAND_RETRY_MAX_ATTEMPT_COUNT, 
Integer.toString(configs.commandRetryCount()));
+        commandParams.put(COMMAND_RETRY_ENABLED, 
Boolean.toString(configs.isCommandRetryEnabled()));
         if (script.getTimeout() > 0) {
           commandTimeout = String.valueOf(script.getTimeout());
         }

http://git-wip-us.apache.org/repos/asf/ambari/blob/c254db4b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
index fedf18e..a57a150 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
@@ -35,6 +35,8 @@ import static 
org.apache.ambari.server.agent.ExecutionCommand.KeyNames.SERVICE_P
 import static 
org.apache.ambari.server.agent.ExecutionCommand.KeyNames.SERVICE_REPO_INFO;
 import static 
org.apache.ambari.server.agent.ExecutionCommand.KeyNames.USER_LIST;
 import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.VERSION;
+import static 
org.apache.ambari.server.agent.ExecutionCommand.KeyNames.COMMAND_RETRY_MAX_ATTEMPT_COUNT;
+import static 
org.apache.ambari.server.agent.ExecutionCommand.KeyNames.COMMAND_RETRY_ENABLED;
 
 import java.io.File;
 import java.io.FileReader;
@@ -1760,6 +1762,8 @@ public class AmbariManagementControllerImpl implements 
AmbariManagementControlle
       if (script != null) {
         commandParams.put(SCRIPT, script.getScript());
         commandParams.put(SCRIPT_TYPE, script.getScriptType().toString());
+        commandParams.put(COMMAND_RETRY_MAX_ATTEMPT_COUNT, 
Integer.toString(configs.commandRetryCount()));
+        commandParams.put(COMMAND_RETRY_ENABLED, 
Boolean.toString(configs.isCommandRetryEnabled()));
         ClusterVersionEntity currentClusterVersion = 
cluster.getCurrentClusterVersion();
         if (currentClusterVersion != null) {
          commandParams.put(VERSION, 
currentClusterVersion.getRepositoryVersion().getVersion());

Reply via email to