Repository: ambari Updated Branches: refs/heads/trunk 14fcf31aa -> 49c7dcd15
Revert "AMBARI-15762. Component install post processing can not be run in parallel (aonishuk)" This reverts commit 5f0a09690310fee76c4cab3639c1c4ec08ecca8a. Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/49c7dcd1 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/49c7dcd1 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/49c7dcd1 Branch: refs/heads/trunk Commit: 49c7dcd1557b294af78ec90030ff6df7bd0284ed Parents: 14fcf31 Author: Sumit Mohanty <smoha...@hortonworks.com> Authored: Fri Apr 8 07:32:59 2016 -0700 Committer: Sumit Mohanty <smoha...@hortonworks.com> Committed: Fri Apr 8 07:32:59 2016 -0700 ---------------------------------------------------------------------- .../TestFileBasedProcessLock.py | 61 ---------------- .../functions/file_based_process_lock.py | 73 -------------------- .../2.0.6/hooks/after-INSTALL/scripts/params.py | 5 -- .../scripts/shared_initialization.py | 8 +-- 4 files changed, 2 insertions(+), 145 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/49c7dcd1/ambari-agent/src/test/python/resource_management/TestFileBasedProcessLock.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/resource_management/TestFileBasedProcessLock.py b/ambari-agent/src/test/python/resource_management/TestFileBasedProcessLock.py deleted file mode 100644 index e4606cc..0000000 --- a/ambari-agent/src/test/python/resource_management/TestFileBasedProcessLock.py +++ /dev/null @@ -1,61 +0,0 @@ -''' -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -''' - -import os -import tempfile -import time -import shutil -from unittest import TestCase -from multiprocessing import Process -from only_for_platform import not_for_platform, PLATFORM_WINDOWS -from resource_management.libraries.functions.file_based_process_lock import FileBasedProcessLock - -class TestFileBasedProcessLock(TestCase): - - - @not_for_platform(PLATFORM_WINDOWS) - def test_file_based_lock(self): - """ - Test BlockingLock using mkdir atomicity. - """ - test_temp_dir = tempfile.mkdtemp(prefix="test_file_based_lock") - try: - indicator_dir = os.path.join(test_temp_dir, "indicator") - lock_file = os.path.join(test_temp_dir, "lock") - - # Raises an exception if mkdir operation fails. - # It indicates that more than one process acquired the lock. - def dummy_task(index): - with FileBasedProcessLock(lock_file): - os.mkdir(indicator_dir) - time.sleep(0.1) - os.rmdir(indicator_dir) - - process_list = [] - for i in range(0, 3): - p = Process(target=dummy_task, args=(i,)) - p.start() - process_list.append(p) - - for p in process_list: - p.join(2) - self.assertEquals(p.exitcode, 0) - - finally: - shutil.rmtree(test_temp_dir) - http://git-wip-us.apache.org/repos/asf/ambari/blob/49c7dcd1/ambari-common/src/main/python/resource_management/libraries/functions/file_based_process_lock.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/file_based_process_lock.py b/ambari-common/src/main/python/resource_management/libraries/functions/file_based_process_lock.py deleted file mode 100644 index f9c981d..0000000 --- a/ambari-common/src/main/python/resource_management/libraries/functions/file_based_process_lock.py +++ /dev/null @@ -1,73 +0,0 @@ -#!/usr/bin/env python -""" -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. - -Ambari Agent - -""" - -import fcntl - -from resource_management.core.logger import Logger - -class FileBasedProcessLock(object): - """A file descriptor based lock for interprocess locking. - The lock is automatically released when process dies. - - WARNING: Do not use this lock for synchronization between threads. - Multiple threads in a same process can simultaneously acquire this lock. - It should be used only for locking between processes. - """ - - def __init__(self, lock_file_path): - """ - :param lock_file_path: The path to the file used for locking - """ - self.lock_file_name = lock_file_path - self.lock_file = None - - def blocking_lock(self): - """ - Creates the lock file if it doesn't exist. - Waits to acquire an exclusive lock on the lock file descriptor. - """ - Logger.info("Trying to acquire a lock on {0}".format(self.lock_file_name)) - if self.lock_file is None or self.lock_file.closed: - self.lock_file = open(self.lock_file_name, 'a') - fcntl.lockf(self.lock_file, fcntl.LOCK_EX) - Logger.info("Acquired the lock on {0}".format(self.lock_file_name)) - - def unlock(self): - """ - Unlocks the lock file descriptor. - """ - Logger.info("Releasing the lock on {0}".format(self.lock_file_name)) - fcntl.lockf(self.lock_file, fcntl.LOCK_UN) - try: - if self.lock_file is not None: - self.lock_file.close() - self.lock_file = None - except IOError: - pass - - def __enter__(self): - self.blocking_lock() - return None - - def __exit__(self, exc_type, exc_val, exc_tb): - self.unlock() - return False \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/49c7dcd1/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/params.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/params.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/params.py index 9cef622..9f4971d 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/params.py +++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/params.py @@ -17,8 +17,6 @@ limitations under the License. """ -import os - from ambari_commons.constants import AMBARI_SUDO_BINARY from resource_management.libraries.script import Script from resource_management.libraries.functions import default @@ -28,7 +26,6 @@ from resource_management.libraries.functions import format_jvm_option from resource_management.libraries.functions.version import format_stack_version config = Script.get_config() -tmp_dir = Script.get_tmp_dir() dfs_type = default("/commandParams/dfs_type", "") @@ -92,5 +89,3 @@ has_namenode = not len(namenode_host) == 0 if has_namenode or dfs_type == 'HCFS': hadoop_conf_dir = conf_select.get_hadoop_conf_dir(force_latest_on_upgrade=True) - -link_configs_lock_file = os.path.join(tmp_dir, "link_configs_lock_file") http://git-wip-us.apache.org/repos/asf/ambari/blob/49c7dcd1/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/shared_initialization.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/shared_initialization.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/shared_initialization.py index b83b115..c772ddb 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/shared_initialization.py +++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/shared_initialization.py @@ -24,7 +24,6 @@ from resource_management.libraries.functions import conf_select from resource_management.libraries.functions import stack_select from resource_management.libraries.functions.format import format from resource_management.libraries.functions.version import compare_versions -from resource_management.libraries.functions.file_based_process_lock import FileBasedProcessLock from resource_management.libraries.resources.xml_config import XmlConfig from resource_management.libraries.script import Script @@ -87,7 +86,6 @@ def link_configs(struct_out_file): """ Links configs, only on a fresh install of HDP-2.3 and higher """ - import params if not Script.is_stack_greater_or_equal("2.3"): Logger.info("Can only link configs for HDP-2.3 and higher.") @@ -99,7 +97,5 @@ def link_configs(struct_out_file): Logger.info("Could not load 'version' from {0}".format(struct_out_file)) return - # On parallel command execution this should be executed by a single process at a time. - with FileBasedProcessLock(params.link_configs_lock_file): - for k, v in conf_select.get_package_dirs().iteritems(): - conf_select.convert_conf_directories_to_symlinks(k, json_version, v) + for k, v in conf_select.get_package_dirs().iteritems(): + conf_select.convert_conf_directories_to_symlinks(k, json_version, v)