As the actual queue management has now moved to wconfd, the
python function for queue management became obsolete.

Signed-off-by: Klaus Aehlig <[email protected]>
---
 lib/jqueue/__init__.py | 107 -------------------------------------------------
 lib/server/masterd.py  |  27 -------------
 2 files changed, 134 deletions(-)

diff --git a/lib/jqueue/__init__.py b/lib/jqueue/__init__.py
index ceb3e96..4d04079 100644
--- a/lib/jqueue/__init__.py
+++ b/lib/jqueue/__init__.py
@@ -1498,67 +1498,6 @@ class JobQueue(object):
     """
     return rpc.JobQueueRunner(self.context, address_list)
 
-  @locking.ssynchronized(_LOCK)
-  def AddNode(self, node):
-    """Register a new node with the queue.
-
-    @type node: L{objects.Node}
-    @param node: the node object to be added
-
-    """
-    node_name = node.name
-    assert node_name != self._my_hostname
-
-    # Clean queue directory on added node
-    result = self._GetRpc(None).call_jobqueue_purge(node_name)
-    msg = result.fail_msg
-    if msg:
-      logging.warning("Cannot cleanup queue directory on node %s: %s",
-                      node_name, msg)
-
-    if not node.master_candidate:
-      # remove if existing, ignoring errors
-      self._nodes.pop(node_name, None)
-      # and skip the replication of the job ids
-      return
-
-    # Upload the whole queue excluding archived jobs
-    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
-
-    # Upload current serial file
-    files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
-
-    # Static address list
-    addrs = [node.primary_ip]
-
-    for file_name in files:
-      # Read file content
-      content = utils.ReadFile(file_name)
-
-      result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
-                             file_name, content)
-      msg = result[node_name].fail_msg
-      if msg:
-        logging.error("Failed to upload file %s to node %s: %s",
-                      file_name, node_name, msg)
-
-    msg = result[node_name].fail_msg
-    if msg:
-      logging.error("Failed to set queue drained flag on node %s: %s",
-                    node_name, msg)
-
-    self._nodes[node_name] = node.primary_ip
-
-  @locking.ssynchronized(_LOCK)
-  def RemoveNode(self, node_name):
-    """Callback called when removing nodes from the cluster.
-
-    @type node_name: str
-    @param node_name: the name of the node to remove
-
-    """
-    self._nodes.pop(node_name, None)
-
   @staticmethod
   def _CheckRpcResult(result, nodes, failmsg):
     """Verifies the status of an RPC call.
@@ -1851,14 +1790,6 @@ class JobQueue(object):
     return luxi.Client(address=pathutils.QUERY_SOCKET).SubmitManyJobs(jobs)
 
   @staticmethod
-  def _FormatSubmitError(msg, ops):
-    """Formats errors which occurred while submitting a job.
-
-    """
-    return ("%s; opcodes %s" %
-            (msg, utils.CommaJoin(op.Summary() for op in ops)))
-
-  @staticmethod
   def _ResolveJobDependencies(resolve_fn, deps):
     """Resolves relative job IDs in dependencies.
 
@@ -2049,44 +1980,6 @@ class JobQueue(object):
 
     return (success, msg)
 
-  def _ArchiveJobsUnlocked(self, jobs):
-    """Archives jobs.
-
-    @type jobs: list of L{_QueuedJob}
-    @param jobs: Job objects
-    @rtype: int
-    @return: Number of archived jobs
-
-    """
-    archive_jobs = []
-    rename_files = []
-    for job in jobs:
-      assert job.writable, "Can't archive read-only job"
-      assert not job.archived, "Can't cancel archived job"
-
-      if job.CalcStatus() not in constants.JOBS_FINALIZED:
-        logging.debug("Job %s is not yet done", job.id)
-        continue
-
-      archive_jobs.append(job)
-
-      old = self._GetJobPath(job.id)
-      new = self._GetArchivedJobPath(job.id)
-      rename_files.append((old, new))
-
-    # TODO: What if 1..n files fail to rename?
-    self._RenameFilesUnlocked(rename_files)
-
-    logging.debug("Successfully archived job(s) %s",
-                  utils.CommaJoin(job.id for job in archive_jobs))
-
-    # Since we haven't quite checked, above, if we succeeded or failed renaming
-    # the files, we update the cached queue size from the filesystem. When we
-    # get around to fix the TODO: above, we can use the number of actually
-    # archived jobs to fix this.
-    self._UpdateQueueSizeUnlocked()
-    return len(archive_jobs)
-
   @locking.ssynchronized(_LOCK)
   def PrepareShutdown(self):
     """Prepare to stop the job queue.
diff --git a/lib/server/masterd.py b/lib/server/masterd.py
index bc39353..4e31236 100644
--- a/lib/server/masterd.py
+++ b/lib/server/masterd.py
@@ -103,30 +103,3 @@ class GanetiContext(object):
   # method could be a function, but keep interface backwards compatible
   def GetRpc(self, cfg):
     return rpc.RpcRunner(cfg, lambda _: None)
-
-  def AddNode(self, cfg, node, ec_id):
-    """Adds a node to the configuration.
-
-    """
-    # Add it to the configuration
-    cfg.AddNode(node, ec_id)
-
-    # If preseeding fails it'll not be added
-    self.jobqueue.AddNode(node)
-
-  def ReaddNode(self, node):
-    """Updates a node that's already in the configuration
-
-    """
-    # Synchronize the queue again
-    self.jobqueue.AddNode(node)
-
-  def RemoveNode(self, cfg, node):
-    """Removes a node from the configuration and lock manager.
-
-    """
-    # Remove node from configuration
-    cfg.RemoveNode(node.uuid)
-
-    # Notify job queue
-    self.jobqueue.RemoveNode(node.name)
-- 
2.2.0.rc0.207.ga3a616c

Reply via email to