ganeti-masterd: CheckAgreement() uses RPC to query other nodes, therefore
it needs threads and we need to fork before calling it, otherwise we fork
while already using threads. Add initialization and shutdown of RPC pool.

ganeti.cli: Add decorator function to initialize and shutdown RPC pool.

ganeti.rpc: Add functions to initialize and shutdown RPC pool. Throw
exception when used without proper initialization.

gnt-cluster, gnt-node: Use decorator function to initialize and shutdown
RPC pool.
---
 daemons/ganeti-masterd |   47 +++++++++++++++++++++++++----------------------
 lib/cli.py             |   12 ++++++++++++
 lib/rpc.py             |   29 +++++++++++++++++++++++------
 scripts/gnt-cluster    |    3 +++
 scripts/gnt-node       |    1 +
 5 files changed, 64 insertions(+), 28 deletions(-)

diff --git a/daemons/ganeti-masterd b/daemons/ganeti-masterd
index d50909d..39c0c4a 100755
--- a/daemons/ganeti-masterd
+++ b/daemons/ganeti-masterd
@@ -428,37 +428,40 @@ def main():
   utils.debug = options.debug
   utils.no_fork = True
 
-  ssconf.CheckMaster(options.debug)
-
-  # we believe we are the master, let's ask the other nodes...
-  if not CheckAgreement():
-    return
-
-  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
-
   # become a daemon
   if options.fork:
-    utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
-                    noclose_fds=[master.fileno()])
+    utils.Daemonize(logfile=constants.LOG_MASTERDAEMON)
 
   utils.WritePidFile(constants.MASTERD_PID)
 
-  utils.SetupLogging(constants.LOG_MASTERDAEMON, debug=options.debug,
-                     stderr_logging=not options.fork)
+  rpc.Init()
+  try:
+    ssconf.CheckMaster(options.debug)
 
-  logging.info("ganeti master daemon startup")
+    # we believe we are the master, let's ask the other nodes...
+    if not CheckAgreement():
+      return
 
-  # activate ip
-  master_node = ssconf.SimpleConfigReader().GetMasterNode()
-  if not rpc.RpcRunner.call_node_start_master(master_node, False):
-    logging.error("Can't activate master IP address")
+    master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
 
-  master.setup_queue()
-  try:
-    master.serve_forever()
+    utils.SetupLogging(constants.LOG_MASTERDAEMON, debug=options.debug,
+                       stderr_logging=not options.fork)
+
+    logging.info("ganeti master daemon startup")
+
+    # activate ip
+    master_node = ssconf.SimpleConfigReader().GetMasterNode()
+    if not rpc.RpcRunner.call_node_start_master(master_node, False):
+      logging.error("Can't activate master IP address")
+
+    master.setup_queue()
+    try:
+      master.serve_forever()
+    finally:
+      master.server_cleanup()
+      utils.RemovePidFile(constants.MASTERD_PID)
   finally:
-    master.server_cleanup()
-    utils.RemovePidFile(constants.MASTERD_PID)
+    rpc.Shutdown()
 
 
 if __name__ == "__main__":
diff --git a/lib/cli.py b/lib/cli.py
index b95b6ae..de649a4 100644
--- a/lib/cli.py
+++ b/lib/cli.py
@@ -36,6 +36,7 @@ from ganeti import constants
 from ganeti import opcodes
 from ganeti import luxi
 from ganeti import ssconf
+from ganeti import rpc
 
 from optparse import (OptionParser, make_option, TitledHelpFormatter,
                       Option, OptionValueError)
@@ -51,6 +52,7 @@ __all__ = ["DEBUG_OPT", "NOHDR_OPT", "SEP_OPT", "GenericMain",
            "JobSubmittedException", "FormatTimestamp", "ParseTimespec",
            "ValidateBeParams",
            "ToStderr", "ToStdout",
+           "UsesRPC",
            ]
 
 
@@ -424,6 +426,16 @@ def ValidateBeParams(bep):
       raise errors.ParameterError("Invalid number of VCPUs")
 
 
+def UsesRPC(fn):
+  def wrapper(*args, **kwargs):
+    rpc.Init()
+    try:
+      return fn(*args, **kwargs)
+    finally:
+      rpc.Shutdown()
+  return wrapper
+
+
 def AskUser(text, choices=None):
   """Ask the user a question.
 
diff --git a/lib/rpc.py b/lib/rpc.py
index 9c3d62f..3c8042a 100644
--- a/lib/rpc.py
+++ b/lib/rpc.py
@@ -41,6 +41,26 @@ from ganeti import http
 from ganeti import serializer
 
 
+# Module level variable
+_http_manager = None
+
+
+def Init():
+  global _http_manager
+
+  assert not _http_manager, "RPC module initialized more than once"
+
+  _http_manager = http.HttpClientManager()
+
+
+def Shutdown():
+  global _http_manager
+
+  if _http_manager:
+    _http_manager.Shutdown()
+    _http_manager = None
+
+
 class Client:
   """RPC Client class.
 
@@ -103,12 +123,9 @@ class Client:
     @returns: List of RPC results
 
     """
-    # TODO: Shared and reused manager
-    mgr = http.HttpClientManager()
-    try:
-      mgr.ExecRequests(self.nc.values())
-    finally:
-      mgr.Shutdown()
+    assert _http_manager, "RPC module not intialized"
+
+    _http_manager.ExecRequests(self.nc.values())
 
     results = {}
 
diff --git a/scripts/gnt-cluster b/scripts/gnt-cluster
index 9ebd10f..fcf0fc9 100755
--- a/scripts/gnt-cluster
+++ b/scripts/gnt-cluster
@@ -36,6 +36,7 @@ from ganeti import bootstrap
 from ganeti import ssh
 
 
[EMAIL PROTECTED]
 def InitCluster(opts, args):
   """Initialize the cluster.
 
@@ -125,6 +126,7 @@ def InitCluster(opts, args):
   return 0
 
 
[EMAIL PROTECTED]
 def DestroyCluster(opts, args):
   """Destroy the cluster.
 
@@ -396,6 +398,7 @@ def VerifyDisks(opts, args):
   return retcode
 
 
[EMAIL PROTECTED]
 def MasterFailover(opts, args):
   """Failover the master node.
 
diff --git a/scripts/gnt-node b/scripts/gnt-node
index 1f45fa5..12372ec 100755
--- a/scripts/gnt-node
+++ b/scripts/gnt-node
@@ -42,6 +42,7 @@ _LIST_DEF_FIELDS = [
   ]
 
 
[EMAIL PROTECTED]
 def AddNode(opts, args):
   """Add a node to the cluster.
 
-- 
1.6.0.3

Reply via email to