Repository: ambari
Updated Branches:
  refs/heads/branch-2.2 91a5b3709 -> 4a935ce05


AMBARI-15762. Component install post processing can not be run in parallel 
(aonishuk)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/4a935ce0
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/4a935ce0
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/4a935ce0

Branch: refs/heads/branch-2.2
Commit: 4a935ce05d71e7d05c64c717333863e6586c41a5
Parents: 91a5b37
Author: Andrew Onishuk <aonis...@hortonworks.com>
Authored: Tue Apr 12 17:59:19 2016 +0300
Committer: Andrew Onishuk <aonis...@hortonworks.com>
Committed: Tue Apr 12 17:59:19 2016 +0300

----------------------------------------------------------------------
 .../ambari_agent/CustomServiceOrchestrator.py   |   1 +
 .../TestCustomServiceOrchestrator.py            |   6 +-
 .../TestFcntlBasedProcessLock.py                |  63 +++++++++++
 .../python/resource_management/core/logger.py   |   4 +
 .../functions/fcntl_based_process_lock.py       | 111 +++++++++++++++++++
 .../2.0.6/hooks/after-INSTALL/scripts/params.py |   8 ++
 .../scripts/shared_initialization.py            |  13 ++-
 7 files changed, 201 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/4a935ce0/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py 
b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
index 6c1a161..9ad520f 100644
--- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
+++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
@@ -315,6 +315,7 @@ class CustomServiceOrchestrator():
     command['public_hostname'] = public_fqdn
     # Add cache dir to make it visible for commands
     command["hostLevelParams"]["agentCacheDir"] = self.config.get('agent', 
'cache_dir')
+    command["agentConfigParams"] = {"agent": {"parallel_execution": 
self.config.get_parallel_exec_option()}}
     # Now, dump the json file
     command_type = command['commandType']
     from ActionQueue import ActionQueue  # To avoid cyclic dependency

http://git-wip-us.apache.org/repos/asf/ambari/blob/4a935ce0/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py 
b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
index e08e2f7..60cab9f 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
@@ -111,7 +111,7 @@ class TestCustomServiceOrchestrator(TestCase):
                          'all_hosts'     : ['h1.hortonworks.com', 
'h2.hortonworks.com'],
                          'all_ping_ports': ['8670', '8670']}
     
-    config = AmbariConfig().getConfig()
+    config = AmbariConfig()
     tempdir = tempfile.gettempdir()
     config.set('agent', 'prefix', tempdir)
     dummy_controller = MagicMock()
@@ -139,6 +139,7 @@ class TestCustomServiceOrchestrator(TestCase):
     os.unlink(json_file)
     # Testing side effect of dump_command_to_json
     self.assertEquals(command['public_hostname'], "test.hst")
+    
self.assertEquals(command['agentConfigParams']['agent']['parallel_execution'], 
0)
     self.assertTrue(unlink_mock.called)
 
 
@@ -171,7 +172,7 @@ class TestCustomServiceOrchestrator(TestCase):
       'hostLevelParams':{}
     }
 
-    config = AmbariConfig().getConfig()
+    config = AmbariConfig()
     tempdir = tempfile.gettempdir()
     config.set('agent', 'prefix', tempdir)
     dummy_controller = MagicMock()
@@ -195,6 +196,7 @@ class TestCustomServiceOrchestrator(TestCase):
     os.unlink(json_file)
     # Testing side effect of dump_command_to_json
     self.assertEquals(command['public_hostname'], "test.hst")
+    
self.assertEquals(command['agentConfigParams']['agent']['parallel_execution'], 
0)
     self.assertTrue(unlink_mock.called)
 
   @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = 
os_distro_value))

http://git-wip-us.apache.org/repos/asf/ambari/blob/4a935ce0/ambari-agent/src/test/python/resource_management/TestFcntlBasedProcessLock.py
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/test/python/resource_management/TestFcntlBasedProcessLock.py 
b/ambari-agent/src/test/python/resource_management/TestFcntlBasedProcessLock.py
new file mode 100644
index 0000000..be6ff80
--- /dev/null
+++ 
b/ambari-agent/src/test/python/resource_management/TestFcntlBasedProcessLock.py
@@ -0,0 +1,63 @@
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import os
+import tempfile
+import time
+import shutil
+import multiprocessing
+from unittest import TestCase
+
+from only_for_platform import  not_for_platform, PLATFORM_WINDOWS
+from resource_management.libraries.functions.fcntl_based_process_lock import 
FcntlBasedProcessLock
+
+class TestFcntlBasedProcessLock(TestCase):
+
+
+  @not_for_platform(PLATFORM_WINDOWS)
+  def test_fcntl_based_lock(self):
+    """
+    Test blocking_lock using multiprocessing.Lock
+    """
+    test_temp_dir = tempfile.mkdtemp(prefix="test_file_based_lock")
+    try:
+      lock_file = os.path.join(test_temp_dir, "lock")
+
+      # Raises an exception if mutex.acquire fails.
+      # It indicates that more than one process acquired the lock.
+      def dummy_task(index, mutex):
+        with FcntlBasedProcessLock(lock_file, skip_fcntl_failures = False):
+          if (not mutex.acquire(block = False)):
+            raise Exception("ERROR: FcntlBasedProcessLock was acquired by 
several processes")
+          time.sleep(0.1)
+          mutex.release()
+
+      mutex = multiprocessing.Lock()
+      process_list = []
+      for i in range(0, 3):
+        p = multiprocessing.Process(target=dummy_task, args=(i, mutex))
+        p.start()
+        process_list.append(p)
+
+      for p in process_list:
+        p.join(2)
+        self.assertEquals(p.exitcode, 0)
+
+    finally:
+      shutil.rmtree(test_temp_dir)
+

http://git-wip-us.apache.org/repos/asf/ambari/blob/4a935ce0/ambari-common/src/main/python/resource_management/core/logger.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/logger.py 
b/ambari-common/src/main/python/resource_management/core/logger.py
index 367b24e..dcd43a0 100644
--- a/ambari-common/src/main/python/resource_management/core/logger.py
+++ b/ambari-common/src/main/python/resource_management/core/logger.py
@@ -55,6 +55,10 @@ class Logger:
     Logger.logger = logger
 
   @staticmethod
+  def exception(text):
+    Logger.logger.exception(Logger.filter_text(text))
+
+  @staticmethod
   def error(text):
     Logger.logger.error(Logger.filter_text(text))
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/4a935ce0/ambari-common/src/main/python/resource_management/libraries/functions/fcntl_based_process_lock.py
----------------------------------------------------------------------
diff --git 
a/ambari-common/src/main/python/resource_management/libraries/functions/fcntl_based_process_lock.py
 
b/ambari-common/src/main/python/resource_management/libraries/functions/fcntl_based_process_lock.py
new file mode 100644
index 0000000..67fadac
--- /dev/null
+++ 
b/ambari-common/src/main/python/resource_management/libraries/functions/fcntl_based_process_lock.py
@@ -0,0 +1,111 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+
+from resource_management.core.logger import Logger
+
+class FcntlBasedProcessLock(object):
+  """A file descriptor based lock for interprocess locking.
+  The lock is automatically released when process dies.
+
+  WARNING: A file system and OS must support fcntl.lockf.
+  Doesn't work on Windows systems. Doesn't work properly on
+  some NFS implementations.
+
+  Currently Ambari uses FcntlBasedProcessLock only when parallel
+  execution is enabled on the agent.
+
+  WARNING: Do not use this lock for synchronization between threads.
+  Multiple threads in a same process can simultaneously acquire this lock.
+  It should be used only for locking between processes.
+  """
+
+  def __init__(self, lock_file_path, skip_fcntl_failures, enabled = True):
+    """
+    :param lock_file_path: The path to the file used for locking
+    :param skip_fcntl_failures: Use this only if the lock is not mandatory.
+                                If set to True, the lock will ignore fcntl 
call failures.
+                                Locking will not work, if fcntl is not 
supported.
+                                skip_fcntl_failures prevents exceptions 
raising in this case.
+    :param enabled: If is set to False, fcntl will not be imported and 
lock/unlock methods return immediately.
+    """
+    self.lock_file_name = lock_file_path
+    self.lock_file = None
+    self.acquired = False
+    self.skip_fcntl_failures = skip_fcntl_failures
+    self.enabled = enabled
+
+  def blocking_lock(self):
+    """
+    Creates the lock file if it doesn't exist.
+    Waits to acquire an exclusive lock on the lock file descriptor.
+    """
+    if not self.enabled:
+      return
+    import fcntl
+    Logger.info("Trying to acquire a lock on {0}".format(self.lock_file_name))
+    if self.lock_file is None or self.lock_file.closed:
+      self.lock_file = open(self.lock_file_name, 'a')
+    try:
+      fcntl.lockf(self.lock_file, fcntl.LOCK_EX)
+    except:
+      if self.skip_fcntl_failures:
+        Logger.exception("Fcntl call raised an exception. A lock was not 
aquired. "
+                         "Continuing as skip_fcntl_failures is set to True")
+      else:
+        raise
+    else:
+      self.acquired = True
+      Logger.info("Acquired the lock on {0}".format(self.lock_file_name))
+
+  def unlock(self):
+    """
+    Unlocks the lock file descriptor.
+    """
+    if not self.enabled:
+      return
+    import fcntl
+    Logger.info("Releasing the lock on {0}".format(self.lock_file_name))
+    if self.acquired:
+      try:
+        fcntl.lockf(self.lock_file, fcntl.LOCK_UN)
+      except:
+        if self.skip_fcntl_failures:
+          Logger.exception("Fcntl call raised an exception. The lock was not 
released. "
+                           "Continuing as skip_fcntl_failures is set to True")
+        else:
+          raise
+      else:
+        self.acquired = False
+        try:
+          self.lock_file.close()
+          self.lock_file = None
+        except IOError:
+          Logger.warning("Failed to close {0}".format(self.lock_file_name))
+
+  def __enter__(self):
+    self.blocking_lock()
+    return None
+
+  def __exit__(self, exc_type, exc_val, exc_tb):
+    self.unlock()
+    return False
+

http://git-wip-us.apache.org/repos/asf/ambari/blob/4a935ce0/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/params.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/params.py
 
b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/params.py
index 68fe9f9..580ab72 100644
--- 
a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/params.py
+++ 
b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/params.py
@@ -17,6 +17,8 @@ limitations under the License.
 
 """
 
+import os
+
 from ambari_commons.constants import AMBARI_SUDO_BINARY
 from resource_management.libraries.script import Script
 from resource_management.libraries.functions import default
@@ -26,9 +28,12 @@ from resource_management.libraries.functions import 
format_jvm_option
 from resource_management.libraries.functions.version import 
format_hdp_stack_version
 
 config = Script.get_config()
+tmp_dir = Script.get_tmp_dir()
 
 dfs_type = default("/commandParams/dfs_type", "")
 
+is_parallel_execution_enabled = 
int(default("/agentConfigParams/agent/parallel_execution", 0)) == 1
+
 sudo = AMBARI_SUDO_BINARY
 
 stack_version_unformatted = str(config['hostLevelParams']['stack_version'])
@@ -89,3 +94,6 @@ has_namenode = not len(namenode_host) == 0
 
 if has_namenode or dfs_type == 'HCFS':
   hadoop_conf_dir = 
conf_select.get_hadoop_conf_dir(force_latest_on_upgrade=True)
+
+link_configs_lock_file = os.path.join(tmp_dir, "link_configs_lock_file")
+hdp_select_lock_file = os.path.join(tmp_dir, "hdp_select_lock_file")

http://git-wip-us.apache.org/repos/asf/ambari/blob/4a935ce0/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/shared_initialization.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/shared_initialization.py
 
b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/shared_initialization.py
index 8ee2f7a..1c53b51 100644
--- 
a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/shared_initialization.py
+++ 
b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/shared_initialization.py
@@ -24,6 +24,7 @@ from resource_management.libraries.functions import 
conf_select
 from resource_management.libraries.functions import hdp_select
 from resource_management.libraries.functions.format import format
 from resource_management.libraries.functions.version import compare_versions
+from resource_management.libraries.functions.fcntl_based_process_lock import 
FcntlBasedProcessLock
 from resource_management.libraries.resources.xml_config import XmlConfig
 from resource_management.libraries.script import Script
 
@@ -41,7 +42,10 @@ def setup_hdp_symlinks():
     # try using the exact version first, falling back in just the stack if 
it's not defined
     # which would only be during an intial cluster installation
     version = params.current_version if params.current_version is not None 
else params.stack_version_unformatted
-    hdp_select.select_all(version)
+
+    # On parallel command execution this should be executed by a single 
process at a time.
+    with FcntlBasedProcessLock(params.hdp_select_lock_file, enabled = 
params.is_parallel_execution_enabled, skip_fcntl_failures = True):
+      hdp_select.select_all(version)
 
 
 def setup_config():
@@ -85,6 +89,7 @@ def link_configs(struct_out_file):
   """
   Links configs, only on a fresh install of HDP-2.3 and higher
   """
+  import params
 
   if not Script.is_hdp_stack_greater_or_equal("2.3"):
     Logger.info("Can only link configs for HDP-2.3 and higher.")
@@ -96,5 +101,7 @@ def link_configs(struct_out_file):
     Logger.info("Could not load 'version' from {0}".format(struct_out_file))
     return
 
-  for k, v in conf_select.PACKAGE_DIRS.iteritems():
-    conf_select.convert_conf_directories_to_symlinks(k, json_version, v)
+  # On parallel command execution this should be executed by a single process 
at a time.
+  with FcntlBasedProcessLock(params.link_configs_lock_file, enabled = 
params.is_parallel_execution_enabled, skip_fcntl_failures = True):
+    for k, v in conf_select.PACKAGE_DIRS.iteritems():
+      conf_select.convert_conf_directories_to_symlinks(k, json_version, v)
\ No newline at end of file

Reply via email to