As masterd does not exist any more, there is no need
the jobs as processes to support all the luxi operations.
So remove support for all that are not potentially needed,
i.e., all but picking up a job and changing its priority.

Signed-off-by: Klaus Aehlig <[email protected]>
---
 lib/jqueue/__init__.py |  17 -----
 lib/server/masterd.py  | 182 +------------------------------------------------
 2 files changed, 3 insertions(+), 196 deletions(-)

diff --git a/lib/jqueue/__init__.py b/lib/jqueue/__init__.py
index 370e957..b381f52 100644
--- a/lib/jqueue/__init__.py
+++ b/lib/jqueue/__init__.py
@@ -1816,23 +1816,6 @@ class JobQueue(object):
     self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
 
   @classmethod
-  def SubmitJob(cls, ops):
-    """Create and store a new job.
-
-    """
-    return luxi.Client(address=pathutils.QUERY_SOCKET).SubmitJob(ops)
-
-  @classmethod
-  def SubmitJobToDrainedQueue(cls, ops):
-    """Forcefully create and store a new job.
-
-    Do so, even if the job queue is drained.
-
-    """
-    return luxi.Client(address=pathutils.QUERY_SOCKET)\
-        .SubmitJobToDrainedQueue(ops)
-
-  @classmethod
   def SubmitManyJobs(cls, jobs):
     """Create and store multiple jobs.
 
diff --git a/lib/server/masterd.py b/lib/server/masterd.py
index ee1ca77..b056584 100644
--- a/lib/server/masterd.py
+++ b/lib/server/masterd.py
@@ -40,8 +40,6 @@ import logging
 from ganeti import config
 from ganeti import constants
 from ganeti import daemon
-from ganeti import mcpu
-from ganeti import opcodes
 from ganeti import jqueue
 from ganeti import luxi
 import ganeti.rpc.errors as rpcerr
@@ -50,9 +48,6 @@ from ganeti import errors
 from ganeti import workerpool
 import ganeti.rpc.node as rpc
 import ganeti.rpc.client as rpccl
-from ganeti import objects
-from ganeti import query
-from ganeti import runtime
 from ganeti import ht
 
 
@@ -266,15 +261,6 @@ class ClientOps(object):
     self.server = server
 
   @staticmethod
-  def _SubmitJob(args, queue):
-    logging.info("Receiving new job")
-    (job_def, ) = args
-    ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
-    job_id = queue.SubmitJob(ops)
-    _LogNewJob(True, job_id, ops)
-    return job_id
-
-  @staticmethod
   def _PickupJob(args, queue):
     logging.info("Picking up new job from queue")
     (job_id, ) = args
@@ -282,132 +268,12 @@ class ClientOps(object):
     return job_id
 
   @staticmethod
-  def _SubmitJobToDrainedQueue(args, queue):
-    logging.info("Forcefully receiving new job")
-    (job_def, ) = args
-    ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
-    job_id = queue.SubmitJobToDrainedQueue(ops)
-    _LogNewJob(True, job_id, ops)
-    return job_id
-
-  @staticmethod
-  def _SubmitManyJobs(args, queue):
-    logging.info("Receiving multiple jobs")
-    (job_defs, ) = args
-    jobs = []
-    for ops in job_defs:
-      jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
-    job_ids = queue.SubmitManyJobs(jobs)
-    for ((status, job_id), ops) in zip(job_ids, jobs):
-      _LogNewJob(status, job_id, ops)
-    return job_ids
-
-  @staticmethod
-  def _CancelJob(args, queue):
-    (job_id, ) = args
-    logging.info("Received job cancel request for %s", job_id)
-    return queue.CancelJob(job_id)
-
-  @staticmethod
   def _ChangeJobPriority(args, queue):
     (job_id, priority) = args
     logging.info("Received request to change priority for job %s to %s",
                  job_id, priority)
     return queue.ChangeJobPriority(job_id, priority)
 
-  @staticmethod
-  def _ArchiveJob(args, queue):
-    (job_id, ) = args
-    logging.info("Received job archive request for %s", job_id)
-    return queue.ArchiveJob(job_id)
-
-  @staticmethod
-  def _AutoArchiveJobs(args, queue):
-    (age, timeout) = args
-    logging.info("Received job autoarchive request for age %s, timeout %s",
-                 age, timeout)
-    return queue.AutoArchiveJobs(age, timeout)
-
-  @staticmethod
-  def _WaitForJobChange(args, queue):
-    (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
-    logging.info("Received job poll request for %s", job_id)
-    return queue.WaitForJobChanges(job_id, fields, prev_job_info,
-                                   prev_log_serial, timeout)
-
-  def _PerformQuery(self, args, queue):
-    (what, fields, qfilter) = args
-
-    if what in constants.QR_VIA_OP:
-      result = self._Query(opcodes.OpQuery(what=what, fields=fields,
-                                           qfilter=qfilter))
-    elif what == constants.QR_LOCK:
-      raise errors.OpPrereqError("Lock queries cannot be asked to jobs")
-    elif what == constants.QR_JOB:
-      result = queue.QueryJobs(fields, qfilter)
-    elif what in constants.QR_VIA_LUXI:
-      luxi_client = runtime.GetClient()
-      result = luxi_client.Query(what, fields, qfilter).ToDict()
-    else:
-      raise errors.OpPrereqError("Resource type '%s' unknown" % what,
-                                 errors.ECODE_INVAL)
-
-    return result
-
-  @staticmethod
-  def _QueryFields(args):
-    (what, fields) = args
-    req = objects.QueryFieldsRequest(what=what, fields=fields)
-
-    try:
-      fielddefs = query.ALL_FIELDS[req.what]
-    except KeyError:
-      raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
-                                 errors.ECODE_INVAL)
-
-    return query.QueryFields(fielddefs, req.fields)
-
-  @staticmethod
-  def _QueryJobs(args, queue):
-    (job_ids, fields) = args
-    if isinstance(job_ids, (tuple, list)) and job_ids:
-      msg = utils.CommaJoin(job_ids)
-    else:
-      msg = str(job_ids)
-    logging.info("Received job query request for %s", msg)
-    return queue.OldStyleQueryJobs(job_ids, fields)
-
-  def _QueryConfigValues(self, args):
-    (fields, ) = args
-    logging.info("Received config values query request for %s", fields)
-    op = opcodes.OpClusterConfigQuery(output_fields=fields)
-    return self._Query(op)
-
-  def _QueryClusterInfo(self):
-    logging.info("Received cluster info query request")
-    op = opcodes.OpClusterQuery()
-    return self._Query(op)
-
-  def _QueryTags(self, args):
-    (kind, name) = args
-    logging.info("Received tags query request")
-    op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
-    return self._Query(op)
-
-  @staticmethod
-  def _SetDrainFlag(args, queue):
-    (drain_flag, ) = args
-    logging.info("Received queue drain flag change request to %s",
-                 drain_flag)
-    return queue.SetDrainFlag(drain_flag)
-
-  @staticmethod
-  def _SetWatcherPause(args, context):
-    (until, ) = args
-    # FIXME!
-    ec_id = None
-    return _SetWatcherPause(context, ec_id, until)
-
   def handle_request(self, method, args): # pylint: disable=R0911
     context = self.server.context
     queue = context.jobqueue
@@ -422,58 +288,16 @@ class ClientOps(object):
       raise ValueError("Invalid operation '%s'" % method)
 
     job_id = None
-    if method == luxi.REQ_SUBMIT_JOB:
-      job_id = self._SubmitJob(args, queue)
-    elif method == luxi.REQ_PICKUP_JOB:
+    if method == luxi.REQ_PICKUP_JOB:
       job_id = self._PickupJob(args, queue)
-    elif method == luxi.REQ_SUBMIT_JOB_TO_DRAINED_QUEUE:
-      job_id = self._SubmitJobToDrainedQueue(args, queue)
-    elif method == luxi.REQ_SUBMIT_MANY_JOBS:
-      job_id = self._SubmitManyJobs(args, queue)
-    elif method == luxi.REQ_CANCEL_JOB:
-      job_id = self._CancelJob(args, queue)
     elif method == luxi.REQ_CHANGE_JOB_PRIORITY:
       job_id = self._ChangeJobPriority(args, queue)
-    elif method == luxi.REQ_ARCHIVE_JOB:
-      job_id = self._ArchiveJob(args, queue)
-    elif method == luxi.REQ_AUTO_ARCHIVE_JOBS:
-      job_id = self._AutoArchiveJobs(args, queue)
-    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
-      job_id = self._WaitForJobChange(args, queue)
-    elif method == luxi.REQ_QUERY:
-      job_id = self._PerformQuery(args, queue)
-    elif method == luxi.REQ_QUERY_FIELDS:
-      job_id = self._QueryFields(args)
-    elif method == luxi.REQ_QUERY_JOBS:
-      job_id = self._QueryJobs(args, queue)
-    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
-      job_id = self._QueryConfigValues(args)
-    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
-      job_id = self._QueryClusterInfo()
-    elif method == luxi.REQ_QUERY_TAGS:
-      job_id = self._QueryTags(args)
-    elif method == luxi.REQ_SET_DRAIN_FLAG:
-      job_id = self._SetDrainFlag(args, queue)
-    elif method == luxi.REQ_SET_WATCHER_PAUSE:
-      job_id = self._SetWatcherPause(args, context)
     else:
-      logging.critical("Request '%s' in luxi.REQ_ALL, but not known", method)
-      raise errors.ProgrammerError("Operation '%s' in luxi.REQ_ALL,"
-                                   " but not implemented" % method)
+      logging.info("Request '%s' not supported by masterd", method)
+      raise ValueError("Unsupported operation '%s'" % method)
 
     return job_id
 
-  def _Query(self, op):
-    """Runs the specified opcode and returns the result.
-
-    """
-    # Queries don't have a job id
-    proc = mcpu.Processor(self.server.context, None, enable_locks=False)
-
-    # TODO: Executing an opcode using locks will acquire them in blocking mode.
-    # Consider using a timeout for retries.
-    return proc.ExecOpCode(op, None)
-
 
 class GanetiContext(object):
   """Context common to all ganeti threads.
-- 
2.0.0.526.g5318336

Reply via email to