Repository: ambari Updated Branches: refs/heads/branch-2.2 91a5b3709 -> 4a935ce05
AMBARI-15762. Component install post processing can not be run in parallel (aonishuk) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/4a935ce0 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/4a935ce0 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/4a935ce0 Branch: refs/heads/branch-2.2 Commit: 4a935ce05d71e7d05c64c717333863e6586c41a5 Parents: 91a5b37 Author: Andrew Onishuk <aonis...@hortonworks.com> Authored: Tue Apr 12 17:59:19 2016 +0300 Committer: Andrew Onishuk <aonis...@hortonworks.com> Committed: Tue Apr 12 17:59:19 2016 +0300 ---------------------------------------------------------------------- .../ambari_agent/CustomServiceOrchestrator.py | 1 + .../TestCustomServiceOrchestrator.py | 6 +- .../TestFcntlBasedProcessLock.py | 63 +++++++++++ .../python/resource_management/core/logger.py | 4 + .../functions/fcntl_based_process_lock.py | 111 +++++++++++++++++++ .../2.0.6/hooks/after-INSTALL/scripts/params.py | 8 ++ .../scripts/shared_initialization.py | 13 ++- 7 files changed, 201 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/4a935ce0/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py index 6c1a161..9ad520f 100644 --- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py +++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py @@ -315,6 +315,7 @@ class CustomServiceOrchestrator(): command['public_hostname'] = public_fqdn # Add cache dir to make it visible for commands command["hostLevelParams"]["agentCacheDir"] = self.config.get('agent', 'cache_dir') + command["agentConfigParams"] = {"agent": {"parallel_execution": self.config.get_parallel_exec_option()}} # Now, dump the json file command_type = command['commandType'] from ActionQueue import ActionQueue # To avoid cyclic dependency http://git-wip-us.apache.org/repos/asf/ambari/blob/4a935ce0/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py index e08e2f7..60cab9f 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py +++ b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py @@ -111,7 +111,7 @@ class TestCustomServiceOrchestrator(TestCase): 'all_hosts' : ['h1.hortonworks.com', 'h2.hortonworks.com'], 'all_ping_ports': ['8670', '8670']} - config = AmbariConfig().getConfig() + config = AmbariConfig() tempdir = tempfile.gettempdir() config.set('agent', 'prefix', tempdir) dummy_controller = MagicMock() @@ -139,6 +139,7 @@ class TestCustomServiceOrchestrator(TestCase): os.unlink(json_file) # Testing side effect of dump_command_to_json self.assertEquals(command['public_hostname'], "test.hst") + self.assertEquals(command['agentConfigParams']['agent']['parallel_execution'], 0) self.assertTrue(unlink_mock.called) @@ -171,7 +172,7 @@ class TestCustomServiceOrchestrator(TestCase): 'hostLevelParams':{} } - config = AmbariConfig().getConfig() + config = AmbariConfig() tempdir = tempfile.gettempdir() config.set('agent', 'prefix', tempdir) dummy_controller = MagicMock() @@ -195,6 +196,7 @@ class TestCustomServiceOrchestrator(TestCase): os.unlink(json_file) # Testing side effect of dump_command_to_json self.assertEquals(command['public_hostname'], "test.hst") + self.assertEquals(command['agentConfigParams']['agent']['parallel_execution'], 0) self.assertTrue(unlink_mock.called) @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) http://git-wip-us.apache.org/repos/asf/ambari/blob/4a935ce0/ambari-agent/src/test/python/resource_management/TestFcntlBasedProcessLock.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/resource_management/TestFcntlBasedProcessLock.py b/ambari-agent/src/test/python/resource_management/TestFcntlBasedProcessLock.py new file mode 100644 index 0000000..be6ff80 --- /dev/null +++ b/ambari-agent/src/test/python/resource_management/TestFcntlBasedProcessLock.py @@ -0,0 +1,63 @@ +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import os +import tempfile +import time +import shutil +import multiprocessing +from unittest import TestCase + +from only_for_platform import not_for_platform, PLATFORM_WINDOWS +from resource_management.libraries.functions.fcntl_based_process_lock import FcntlBasedProcessLock + +class TestFcntlBasedProcessLock(TestCase): + + + @not_for_platform(PLATFORM_WINDOWS) + def test_fcntl_based_lock(self): + """ + Test blocking_lock using multiprocessing.Lock + """ + test_temp_dir = tempfile.mkdtemp(prefix="test_file_based_lock") + try: + lock_file = os.path.join(test_temp_dir, "lock") + + # Raises an exception if mutex.acquire fails. + # It indicates that more than one process acquired the lock. + def dummy_task(index, mutex): + with FcntlBasedProcessLock(lock_file, skip_fcntl_failures = False): + if (not mutex.acquire(block = False)): + raise Exception("ERROR: FcntlBasedProcessLock was acquired by several processes") + time.sleep(0.1) + mutex.release() + + mutex = multiprocessing.Lock() + process_list = [] + for i in range(0, 3): + p = multiprocessing.Process(target=dummy_task, args=(i, mutex)) + p.start() + process_list.append(p) + + for p in process_list: + p.join(2) + self.assertEquals(p.exitcode, 0) + + finally: + shutil.rmtree(test_temp_dir) + http://git-wip-us.apache.org/repos/asf/ambari/blob/4a935ce0/ambari-common/src/main/python/resource_management/core/logger.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/main/python/resource_management/core/logger.py b/ambari-common/src/main/python/resource_management/core/logger.py index 367b24e..dcd43a0 100644 --- a/ambari-common/src/main/python/resource_management/core/logger.py +++ b/ambari-common/src/main/python/resource_management/core/logger.py @@ -55,6 +55,10 @@ class Logger: Logger.logger = logger @staticmethod + def exception(text): + Logger.logger.exception(Logger.filter_text(text)) + + @staticmethod def error(text): Logger.logger.error(Logger.filter_text(text)) http://git-wip-us.apache.org/repos/asf/ambari/blob/4a935ce0/ambari-common/src/main/python/resource_management/libraries/functions/fcntl_based_process_lock.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/fcntl_based_process_lock.py b/ambari-common/src/main/python/resource_management/libraries/functions/fcntl_based_process_lock.py new file mode 100644 index 0000000..67fadac --- /dev/null +++ b/ambari-common/src/main/python/resource_management/libraries/functions/fcntl_based_process_lock.py @@ -0,0 +1,111 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +Ambari Agent + +""" + +from resource_management.core.logger import Logger + +class FcntlBasedProcessLock(object): + """A file descriptor based lock for interprocess locking. + The lock is automatically released when process dies. + + WARNING: A file system and OS must support fcntl.lockf. + Doesn't work on Windows systems. Doesn't work properly on + some NFS implementations. + + Currently Ambari uses FcntlBasedProcessLock only when parallel + execution is enabled on the agent. + + WARNING: Do not use this lock for synchronization between threads. + Multiple threads in a same process can simultaneously acquire this lock. + It should be used only for locking between processes. + """ + + def __init__(self, lock_file_path, skip_fcntl_failures, enabled = True): + """ + :param lock_file_path: The path to the file used for locking + :param skip_fcntl_failures: Use this only if the lock is not mandatory. + If set to True, the lock will ignore fcntl call failures. + Locking will not work, if fcntl is not supported. + skip_fcntl_failures prevents exceptions raising in this case. + :param enabled: If is set to False, fcntl will not be imported and lock/unlock methods return immediately. + """ + self.lock_file_name = lock_file_path + self.lock_file = None + self.acquired = False + self.skip_fcntl_failures = skip_fcntl_failures + self.enabled = enabled + + def blocking_lock(self): + """ + Creates the lock file if it doesn't exist. + Waits to acquire an exclusive lock on the lock file descriptor. + """ + if not self.enabled: + return + import fcntl + Logger.info("Trying to acquire a lock on {0}".format(self.lock_file_name)) + if self.lock_file is None or self.lock_file.closed: + self.lock_file = open(self.lock_file_name, 'a') + try: + fcntl.lockf(self.lock_file, fcntl.LOCK_EX) + except: + if self.skip_fcntl_failures: + Logger.exception("Fcntl call raised an exception. A lock was not aquired. " + "Continuing as skip_fcntl_failures is set to True") + else: + raise + else: + self.acquired = True + Logger.info("Acquired the lock on {0}".format(self.lock_file_name)) + + def unlock(self): + """ + Unlocks the lock file descriptor. + """ + if not self.enabled: + return + import fcntl + Logger.info("Releasing the lock on {0}".format(self.lock_file_name)) + if self.acquired: + try: + fcntl.lockf(self.lock_file, fcntl.LOCK_UN) + except: + if self.skip_fcntl_failures: + Logger.exception("Fcntl call raised an exception. The lock was not released. " + "Continuing as skip_fcntl_failures is set to True") + else: + raise + else: + self.acquired = False + try: + self.lock_file.close() + self.lock_file = None + except IOError: + Logger.warning("Failed to close {0}".format(self.lock_file_name)) + + def __enter__(self): + self.blocking_lock() + return None + + def __exit__(self, exc_type, exc_val, exc_tb): + self.unlock() + return False + http://git-wip-us.apache.org/repos/asf/ambari/blob/4a935ce0/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/params.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/params.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/params.py index 68fe9f9..580ab72 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/params.py +++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/params.py @@ -17,6 +17,8 @@ limitations under the License. """ +import os + from ambari_commons.constants import AMBARI_SUDO_BINARY from resource_management.libraries.script import Script from resource_management.libraries.functions import default @@ -26,9 +28,12 @@ from resource_management.libraries.functions import format_jvm_option from resource_management.libraries.functions.version import format_hdp_stack_version config = Script.get_config() +tmp_dir = Script.get_tmp_dir() dfs_type = default("/commandParams/dfs_type", "") +is_parallel_execution_enabled = int(default("/agentConfigParams/agent/parallel_execution", 0)) == 1 + sudo = AMBARI_SUDO_BINARY stack_version_unformatted = str(config['hostLevelParams']['stack_version']) @@ -89,3 +94,6 @@ has_namenode = not len(namenode_host) == 0 if has_namenode or dfs_type == 'HCFS': hadoop_conf_dir = conf_select.get_hadoop_conf_dir(force_latest_on_upgrade=True) + +link_configs_lock_file = os.path.join(tmp_dir, "link_configs_lock_file") +hdp_select_lock_file = os.path.join(tmp_dir, "hdp_select_lock_file") http://git-wip-us.apache.org/repos/asf/ambari/blob/4a935ce0/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/shared_initialization.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/shared_initialization.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/shared_initialization.py index 8ee2f7a..1c53b51 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/shared_initialization.py +++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/shared_initialization.py @@ -24,6 +24,7 @@ from resource_management.libraries.functions import conf_select from resource_management.libraries.functions import hdp_select from resource_management.libraries.functions.format import format from resource_management.libraries.functions.version import compare_versions +from resource_management.libraries.functions.fcntl_based_process_lock import FcntlBasedProcessLock from resource_management.libraries.resources.xml_config import XmlConfig from resource_management.libraries.script import Script @@ -41,7 +42,10 @@ def setup_hdp_symlinks(): # try using the exact version first, falling back in just the stack if it's not defined # which would only be during an intial cluster installation version = params.current_version if params.current_version is not None else params.stack_version_unformatted - hdp_select.select_all(version) + + # On parallel command execution this should be executed by a single process at a time. + with FcntlBasedProcessLock(params.hdp_select_lock_file, enabled = params.is_parallel_execution_enabled, skip_fcntl_failures = True): + hdp_select.select_all(version) def setup_config(): @@ -85,6 +89,7 @@ def link_configs(struct_out_file): """ Links configs, only on a fresh install of HDP-2.3 and higher """ + import params if not Script.is_hdp_stack_greater_or_equal("2.3"): Logger.info("Can only link configs for HDP-2.3 and higher.") @@ -96,5 +101,7 @@ def link_configs(struct_out_file): Logger.info("Could not load 'version' from {0}".format(struct_out_file)) return - for k, v in conf_select.PACKAGE_DIRS.iteritems(): - conf_select.convert_conf_directories_to_symlinks(k, json_version, v) + # On parallel command execution this should be executed by a single process at a time. + with FcntlBasedProcessLock(params.link_configs_lock_file, enabled = params.is_parallel_execution_enabled, skip_fcntl_failures = True): + for k, v in conf_select.PACKAGE_DIRS.iteritems(): + conf_select.convert_conf_directories_to_symlinks(k, json_version, v) \ No newline at end of file