This backend daemon for instance import and export will be used to
transfer instance data to other machines. It is implemented in a generic
way to support different ways of data input and output. The third-party
program “socat”, which is already used by the KVM hypervisor abstraction,
is used to connect to remote machines using SSL/TLS. After starting the
child processes in a separate process group, the import/export daemon
monitors their output and updates a status file regularily. This status
file can then be read by ganeti-noded (not in this patch).

Three I/O methods are supported: Raw disk, file and script. Each of these
can be used for import and export.

Similar to daemon-util, an incomplete set of tests written in Bash is
included.

Two future enhancements are planned:
- Run parts of the command chain as a dedicated user (privilege
  separation).
- Currently users of this daemon have to poll the status file while data
  is transferred. This is inefficient and creates unnecessary delays. By
  adding “dd” into the chain and sending it SIGUSR1 regularily, we can get
  some statistics, optimize the polling frequenc and even provide the user
  with an ETA (which isn't available with all current methods to
  import/export instance data).

Signed-off-by: Michael Hanselmann <[email protected]>
---
 Makefile.am                        |   14 +-
 daemons/import-export              |  645 ++++++++++++++++++++++++++++++++++++
 epydoc.conf                        |    2 +-
 lib/constants.py                   |    6 +
 lib/objects.py                     |   11 +
 test/import-export_unittest-helper |   77 +++++
 test/import-export_unittest.bash   |  196 +++++++++++
 7 files changed, 948 insertions(+), 3 deletions(-)
 create mode 100755 daemons/import-export
 create mode 100755 test/import-export_unittest-helper
 create mode 100755 test/import-export_unittest.bash

diff --git a/Makefile.am b/Makefile.am
index 32677cd..817e9e8 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -239,8 +239,12 @@ dist_tools_SCRIPTS = \
        tools/cluster-merge \
        tools/lvmstrap
 
+pkglib_python_scripts = \
+       daemons/import-export
+
 pkglib_SCRIPTS = \
-       daemons/daemon-util
+       daemons/daemon-util \
+       $(pkglib_python_scripts)
 
 EXTRA_DIST = \
        NEWS \
@@ -254,6 +258,7 @@ EXTRA_DIST = \
        $(RUN_IN_TEMPDIR) \
        daemons/daemon-util.in \
        daemons/ganeti-cleaner.in \
+       $(pkglib_python_scripts) \
        devel/upload.in \
        $(docdot) \
        $(docpng) \
@@ -320,7 +325,8 @@ TEST_FILES = \
        test/data/cert1.pem \
        test/data/proc_drbd8.txt \
        test/data/proc_drbd80-emptyline.txt \
-       test/data/proc_drbd83.txt
+       test/data/proc_drbd83.txt \
+       test/import-export_unittest-helper
 
 python_tests = \
        test/ganeti.backend_unittest.py \
@@ -349,6 +355,7 @@ python_tests = \
 
 dist_TESTS = \
        test/daemon-util_unittest.bash \
+       test/import-export_unittest.bash \
        $(python_tests)
 
 nodist_TESTS =
@@ -366,6 +373,7 @@ TESTS_ENVIRONMENT = \
 all_python_code = \
        $(dist_sbin_SCRIPTS) \
        $(dist_tools_SCRIPTS) \
+       $(pkglib_python_scripts) \
        $(python_tests) \
        $(pkgpython_PYTHON) \
        $(hypervisor_PYTHON) \
@@ -377,6 +385,7 @@ all_python_code = \
 srclink_files = \
        man/footer.sgml \
        test/daemon-util_unittest.bash \
+       test/import-export_unittest.bash \
        $(all_python_code)
 
 check_python_code = \
@@ -387,6 +396,7 @@ lint_python_code = \
        ganeti \
        $(dist_sbin_SCRIPTS) \
        $(dist_tools_SCRIPTS) \
+       $(pkglib_python_scripts) \
        $(BUILD_BASH_COMPLETION)
 
 test/daemon-util_unittest.bash: daemons/daemon-util
diff --git a/daemons/import-export b/daemons/import-export
new file mode 100755
index 0000000..0d2520e
--- /dev/null
+++ b/daemons/import-export
@@ -0,0 +1,645 @@
+#!/usr/bin/python
+#
+
+# Copyright (C) 2010 Google Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 02110-1301, USA.
+
+
+"""Import/export daemon.
+
+"""
+
+# pylint: disable-msg=C0103
+# C0103: Invalid name import-export
+
+import errno
+import logging
+import optparse
+import os
+import re
+import select
+import signal
+import socket
+import subprocess
+import sys
+import time
+from cStringIO import StringIO
+
+from ganeti import constants
+from ganeti import cli
+from ganeti import utils
+from ganeti import serializer
+from ganeti import objects
+from ganeti import locking
+
+
+#: Used to recognize point at which socat(1) starts to listen on its socket.
+#: The local address is required for the remote peer to connect (in particular
+#: the port number).
+LISTENING_RE = re.compile(r"^listening on\s+"
+                          r"AF=(?P<family>\d+)\s+"
+                          r"(?P<address>.+):(?P<port>\d+)$", re.I)
+
+#: Used to recognize point at which socat(1) is sending data over the wire
+TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
+                              re.I)
+
+SOCAT_LOG_DEBUG = "D"
+SOCAT_LOG_INFO = "I"
+SOCAT_LOG_NOTICE = "N"
+SOCAT_LOG_WARNING = "W"
+SOCAT_LOG_ERROR = "E"
+SOCAT_LOG_FATAL = "F"
+
+SOCAT_LOG_IGNORE = frozenset([
+  SOCAT_LOG_DEBUG,
+  SOCAT_LOG_INFO,
+  SOCAT_LOG_NOTICE,
+  ])
+
+#: Socat buffer size: at most this many bytes are transferred per step
+SOCAT_BUFSIZE = 1024 * 1024
+
+#: How many lines to keep in the status file
+MAX_RECENT_OUTPUT_LINES = 20
+
+#: Don't update status file more than once every 5 seconds (unless forced)
+MIN_UPDATE_INTERVAL = 5.0
+
+#: Give child process up to 5 seconds to exit after sending a signal
+CHILD_LINGER_TIMEOUT = 5.0
+
+# Common options for socat
+SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
+SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"]
+SOCAT_CONNECT_TIMEOUT = 60
+
+
+# Global variable for options
+options = None
+
+
+class Error(Exception):
+  """Generic exception"""
+
+
+def SetupLogging():
+  """Configures the logging module.
+
+  """
+  formatter = logging.Formatter("%(asctime)s: %(message)s")
+
+  stderr_handler = logging.StreamHandler()
+  stderr_handler.setFormatter(formatter)
+  stderr_handler.setLevel(logging.NOTSET)
+
+  root_logger = logging.getLogger("")
+  root_logger.addHandler(stderr_handler)
+
+  if options.debug:
+    root_logger.setLevel(logging.NOTSET)
+  elif options.verbose:
+    root_logger.setLevel(logging.INFO)
+  else:
+    root_logger.setLevel(logging.ERROR)
+
+  # Create special logger for child process output
+  child_logger = logging.Logger("child output")
+  child_logger.addHandler(stderr_handler)
+  child_logger.setLevel(logging.NOTSET)
+
+  return child_logger
+
+
+def _VerifyListening(family, address, port):
+  """Verify address given as listening address by socat.
+
+  """
+  # TODO: Implement IPv6 support
+  if family != socket.AF_INET:
+    raise Error("Address family %r not supported" % family)
+
+  try:
+    packed_address = socket.inet_pton(family, address)
+  except socket.error:
+    raise Error("Invalid address %r for family %s" % (address, family))
+
+  return (socket.inet_ntop(family, packed_address), port)
+
+
+class StatusFile:
+  """Status file manager.
+
+  """
+  def __init__(self, path):
+    """Initializes class.
+
+    """
+    self._path = path
+    self._data = objects.ImportExportStatus(ctime=time.time(),
+                                            mtime=None,
+                                            recent_output=[])
+
+  def AddRecentOutput(self, line):
+    """Adds a new line of recent output.
+
+    """
+    self._data.recent_output.append(line)
+
+    # Remove old lines
+    del self._data.recent_output[:-MAX_RECENT_OUTPUT_LINES]
+
+  def SetListenPort(self, port):
+    """Sets the port the daemon is listening on.
+
+    @type port: int
+    @param port: TCP/UDP port
+
+    """
+    assert isinstance(port, (int, long)) and 0 < port < 2**16
+    self._data.listen_port = port
+
+  def GetListenPort(self):
+    """Returns the port the daemon is listening on.
+
+    """
+    return self._data.listen_port
+
+  def SetConnected(self):
+    """Sets the connected flag.
+
+    """
+    self._data.connected = True
+
+  def SetExitStatus(self, exit_status, error_message):
+    """Sets the exit status and an error message.
+
+    """
+    # Require error message when status isn't 0
+    assert exit_status == 0 or error_message
+
+    self._data.exit_status = exit_status
+    self._data.error_message = error_message
+
+  def ExitStatusIsSuccess(self):
+    """Returns whether the exit status means "success".
+
+    """
+    return not bool(self._data.error_message)
+
+  def Update(self, force):
+    """Updates the status file.
+
+    @type force: bool
+    @param force: Write status file in any case, not only when minimum interval
+                  is expired
+
+    """
+    if not (force or
+            self._data.mtime is None or
+            time.time() > (self._data.mtime + MIN_UPDATE_INTERVAL)):
+      return
+
+    logging.debug("Updating status file %s", self._path)
+
+    self._data.mtime = time.time()
+    utils.WriteFile(self._path,
+                    data=serializer.DumpJson(self._data.ToDict(), indent=True),
+                    mode=0400)
+
+
+def _ProcessSocatOutput(status_file, level, msg):
+  """Interprets socat log output.
+
+  """
+  if level == SOCAT_LOG_NOTICE:
+    if status_file.GetListenPort() is None:
+      # TODO: Maybe implement timeout to not listen forever
+      m = LISTENING_RE.match(msg)
+      if m:
+        (_, port) = _VerifyListening(int(m.group("family")), 
m.group("address"),
+                                     int(m.group("port")))
+
+        status_file.SetListenPort(port)
+        return True
+
+    m = TRANSFER_LOOP_RE.match(msg)
+    if m:
+      status_file.SetConnected()
+      return True
+
+  return False
+
+
+def ProcessOutput(line, status_file, logger, socat):
+  """Takes care of child process output.
+
+  @param status_file: Status file manager
+  @param logger: Child output logger
+  @type socat: bool
+  @param socat: Whether it's a socat output line
+  @type line: string
+  @param line: Child output line
+
+  """
+  force_update = False
+  forward_line = line
+
+  if socat:
+    level = None
+    parts = line.split(None, 4)
+
+    if len(parts) == 5:
+      (_, _, _, level, msg) = parts
+
+      force_update = _ProcessSocatOutput(status_file, level, msg)
+
+      if options.debug or (level and level not in SOCAT_LOG_IGNORE):
+        forward_line = "socat: %s %s" % (level, msg)
+      else:
+        forward_line = None
+    else:
+      forward_line = "socat: %s" % line
+
+  if forward_line:
+    logger.info(forward_line)
+    status_file.AddRecentOutput(forward_line)
+
+  status_file.Update(force_update)
+
+
+def GetBashCommand(cmd):
+  """Prepares a command to be run in Bash.
+
+  """
+  return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
+
+
+def GetSocatCommand(mode):
+  """Returns the socat command.
+
+  """
+  common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [
+    "key=%s" % options.key,
+    "cert=%s" % options.cert,
+    "cafile=%s" % options.ca,
+    ]
+
+  if options.bind is not None:
+    common_addr_opts.append("bind=%s" % options.bind)
+
+  if mode == constants.IEM_IMPORT:
+    if options.port is None:
+      port = 0
+    else:
+      port = options.port
+
+    addr1 = [
+      "OPENSSL-LISTEN:%s" % port,
+      "reuseaddr",
+      ] + common_addr_opts
+    addr2 = ["stdout"]
+
+  elif mode == constants.IEM_EXPORT:
+    addr1 = ["stdin"]
+    addr2 = [
+      "OPENSSL:%s:%s" % (options.host, options.port),
+      "connect-timeout=%s" % SOCAT_CONNECT_TIMEOUT,
+      ] + common_addr_opts
+
+  else:
+    raise Error("Invalid mode")
+
+  for i in [addr1, addr2]:
+    for value in i:
+      if "," in value:
+        raise Error("Comma not allowed in socat option value: %r" % value)
+
+  return [
+    constants.SOCAT_PATH,
+
+    # Log to stderr
+    "-ls",
+
+    # Log level
+    "-d", "-d",
+
+    # Buffer size
+    "-b%s" % SOCAT_BUFSIZE,
+
+    # Unidirectional mode, the first address is only used for reading, and the
+    # second address is only used for writing
+    "-u",
+
+    ",".join(addr1), ",".join(addr2)
+    ]
+
+
+def GetTransportCommand(mode, socat_stderr_fd):
+  """Returns the command for the transport part of the daemon.
+
+  @param mode: Daemon mode (import or export)
+  @type socat_stderr_fd: int
+  @param socat_stderr_fd: File descriptor socat should write its stderr to
+
+  """
+  socat_cmd = ("%s 2>&%d" %
+               (utils.ShellQuoteArgs(GetSocatCommand(mode)),
+                socat_stderr_fd))
+
+  if mode == constants.IEM_IMPORT:
+    transport_cmd = "%s | gunzip -c" % socat_cmd
+  elif mode == constants.IEM_EXPORT:
+    transport_cmd = "gzip -c | %s" % socat_cmd
+  else:
+    raise Error("Invalid mode")
+
+  # TODO: Use "dd" to measure processed data (allows to give an ETA)
+  # TODO: If a connection to socat is dropped (e.g. due to a wrong
+  # certificate), socat should be restarted
+
+  # TODO: Run transport as separate user
+  # The transport uses its own shell to simplify running it as a separate user
+  # in the future.
+  return GetBashCommand(transport_cmd)
+
+
+def GetCommand(mode, socat_stderr_fd):
+  """Returns the complete child process command.
+
+  """
+  buf = StringIO()
+
+  if options.cmd_prefix:
+    buf.write(options.cmd_prefix)
+    buf.write(" ")
+
+  buf.write(utils.ShellQuoteArgs(GetTransportCommand(mode, socat_stderr_fd)))
+
+  if options.cmd_suffix:
+    buf.write(" ")
+    buf.write(options.cmd_suffix)
+
+  return GetBashCommand(buf.getvalue())
+
+
+def ProcessChildIO(child, socat_stderr_read, status_file, child_logger,
+                   signal_notify, signal_handler):
+  """Handles the child processes' output.
+
+  """
+  poller = select.poll()
+
+  script_stderr_lines = utils.LineSplitter(ProcessOutput, status_file,
+                                           child_logger, False)
+  try:
+    socat_stderr_lines = utils.LineSplitter(ProcessOutput, status_file,
+                                            child_logger, True)
+    try:
+      fdmap = {
+        child.stderr.fileno(): (child.stderr, script_stderr_lines),
+        socat_stderr_read.fileno(): (socat_stderr_read, socat_stderr_lines),
+        signal_notify.fileno(): (signal_notify, None),
+        }
+
+      for fd in fdmap:
+        utils.SetNonblockFlag(fd, True)
+        poller.register(fd, select.POLLIN)
+
+      timeout_calculator = None
+      while True:
+        # Break out of loop if only signal notify FD is left
+        if len(fdmap) == 1 and signal_notify.fileno() in fdmap:
+          break
+
+        if timeout_calculator:
+          timeout = timeout_calculator.Remaining() * 1000
+          if timeout < 0:
+            logging.info("Child process didn't exit in time")
+            break
+        else:
+          timeout = None
+
+        for fd, event in utils.RetryOnSignal(poller.poll, timeout):
+          if event & (select.POLLIN | event & select.POLLPRI):
+            (from_, to) = fdmap[fd]
+
+            # Read up to 1 KB of data
+            data = from_.read(1024)
+            if data:
+              if to:
+                to.write(data)
+              elif fd == signal_notify.fileno():
+                # Signal handling
+                if signal_handler.called:
+                  signal_handler.Clear()
+                  if timeout_calculator:
+                    logging.info("Child process still has about %0.2f seconds"
+                                 " exit", timeout_calculator.Remaining())
+                  else:
+                    logging.info("Giving child process %0.2f seconds to exit",
+                                 CHILD_LINGER_TIMEOUT)
+                    timeout_calculator = \
+                      locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True)
+            else:
+              poller.unregister(fd)
+              del fdmap[fd]
+
+          elif event & (select.POLLNVAL | select.POLLHUP |
+                        select.POLLERR):
+            poller.unregister(fd)
+            del fdmap[fd]
+
+        script_stderr_lines.flush()
+        socat_stderr_lines.flush()
+
+      # If there was a timeout calculator, we were waiting for the child to
+      # finish, e.g. due to a signal
+      return not bool(timeout_calculator)
+    finally:
+      socat_stderr_lines.close()
+  finally:
+    script_stderr_lines.close()
+
+
+def ParseOptions():
+  """Parses the options passed to the program.
+
+  @return: Arguments to program
+
+  """
+  global options # pylint: disable-msg=W0603
+
+  parser = optparse.OptionParser(usage=("%%prog <status-file> {%s|%s}" %
+                                        (constants.IEM_IMPORT,
+                                         constants.IEM_EXPORT)))
+  parser.add_option(cli.DEBUG_OPT)
+  parser.add_option(cli.VERBOSE_OPT)
+  parser.add_option("--key", dest="key", action="store", type="string",
+                    help="RSA key file")
+  parser.add_option("--cert", dest="cert", action="store", type="string",
+                    help="X509 certificate file")
+  parser.add_option("--ca", dest="ca", action="store", type="string",
+                    help="X509 CA file")
+  parser.add_option("--bind", dest="bind", action="store", type="string",
+                    help="Bind address")
+  parser.add_option("--host", dest="host", action="store", type="string",
+                    help="Remote hostname")
+  parser.add_option("--port", dest="port", action="store", type="int",
+                    help="Remote port")
+  parser.add_option("--cmd-prefix", dest="cmd_prefix", action="store",
+                    type="string", help="Command prefix")
+  parser.add_option("--cmd-suffix", dest="cmd_suffix", action="store",
+                    type="string", help="Command suffix")
+
+  (options, args) = parser.parse_args()
+
+  if len(args) != 2:
+    # Won't return
+    parser.error("Expected exactly two arguments")
+
+  (status_file_path, mode) = args
+
+  if mode not in (constants.IEM_IMPORT,
+                  constants.IEM_EXPORT):
+    # Won't return
+    parser.error("Invalid mode: %s" % mode)
+
+  return (status_file_path, mode)
+
+
+def main():
+  """Main function.
+
+  """
+  # Option parsing
+  (status_file_path, mode) = ParseOptions()
+
+  # Configure logging
+  child_logger = SetupLogging()
+
+  status_file = StatusFile(status_file_path)
+  try:
+    try:
+      # Pipe to receive socat's stderr output
+      (socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe()
+
+      # Pipe to notify on signals
+      (signal_notify_read_fd, signal_notify_write_fd) = os.pipe()
+
+      # Configure signal module's notifier
+      try:
+        # This is only supported in Python 2.5 and above (some distributions
+        # backported it to Python 2.4)
+        set_wakeup_fd_fn = signal.set_wakeup_fd
+      except AttributeError:
+        pass
+      else:
+        set_wakeup_fd_fn(signal_notify_write_fd)
+
+      # Buffer size 0 is important, otherwise .read() with a specified length
+      # might buffer data while poll(2) won't mark its file descriptor as
+      # readable again.
+      socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0)
+      signal_notify_read = os.fdopen(signal_notify_read_fd, "r", 0)
+
+      # Get child process command
+      cmd = GetCommand(mode, socat_stderr_write_fd)
+
+      logging.debug("Starting command %r", cmd)
+
+      def _ChildPreexec():
+        # Move child to separate process group. By sending a signal to its
+        # process group we can kill the child process and all its own
+        # child-processes.
+        os.setpgid(0, 0)
+
+        # Close almost all file descriptors
+        utils.CloseFDs(noclose_fds=[socat_stderr_write_fd])
+
+      # Not using close_fds because doing so would also close the socat stderr
+      # pipe, which we still need.
+      child = subprocess.Popen(cmd, shell=False, close_fds=False,
+                               stderr=subprocess.PIPE, stdout=None, stdin=None,
+                               preexec_fn=_ChildPreexec)
+      try:
+        # Avoid race condition by setting child's process group (as good as
+        # possible in Python) before sending signals to child. For an
+        # explanation, see preexec function for child.
+        try:
+          os.setpgid(child.pid, child.pid)
+        except EnvironmentError, err:
+          # If the child process was faster we receive EPERM or EACCES
+          if err.errno not in (errno.EPERM, errno.EACCES):
+            raise
+
+        # Forward signals to child process
+        def _ForwardSignal(signum, _):
+          # Wake up poll(2)
+          os.write(signal_notify_write_fd, "\0")
+
+          # Send signal to child
+          os.killpg(child.pid, signum)
+
+        # TODO: There is a race condition between starting the child and
+        # handling the signals here. While there might be a way to work around
+        # it by registering the handlers before starting the child and
+        # deferring sent signals until the child is available, doing so can be
+        # complicated.
+        signal_handler = utils.SignalHandler([signal.SIGTERM, signal.SIGINT],
+                                             handler_fn=_ForwardSignal)
+        try:
+          # Close child's side
+          utils.RetryOnSignal(os.close, socat_stderr_write_fd)
+
+          if ProcessChildIO(child, socat_stderr_read, status_file, 
child_logger,
+                            signal_notify_read, signal_handler):
+            # The child closed all its file descriptors and there was no signal
+            # TODO: Implement timeout instead of waiting indefinitely
+            utils.RetryOnSignal(child.wait)
+        finally:
+          signal_handler.Reset()
+      finally:
+        # Final check if child process is still alive
+        if utils.RetryOnSignal(child.poll) is None:
+          logging.error("Child process still alive, sending SIGKILL")
+          os.killpg(child.pid, signal.SIGKILL)
+          utils.RetryOnSignal(child.wait)
+
+      if child.returncode == 0:
+        errmsg = None
+      elif child.returncode < 0:
+        errmsg = "Exited due to signal %s" % (-child.returncode, )
+      else:
+        errmsg = "Exited with status %s" % (child.returncode, )
+
+      status_file.SetExitStatus(child.returncode, errmsg)
+    except Exception, err: # pylint: disable-msg=W0703
+      logging.exception("Unhandled error occurred")
+      status_file.SetExitStatus(constants.EXIT_FAILURE,
+                                "Unhandled error occurred: %s" % (err, ))
+
+    if status_file.ExitStatusIsSuccess():
+      sys.exit(constants.EXIT_SUCCESS)
+
+    sys.exit(constants.EXIT_FAILURE)
+  finally:
+    status_file.Update(True)
+
+
+if __name__ == "__main__":
+  main()
diff --git a/epydoc.conf b/epydoc.conf
index 11069a3..7f27503 100644
--- a/epydoc.conf
+++ b/epydoc.conf
@@ -8,7 +8,7 @@ output: html
 # note: the wildcards means the directories should be cleaned up after each
 # run, otherwise there will be stale '*c' (compiled) files that will not be
 # parsable and will break the epydoc run
-modules: ganeti, scripts/gnt-*, daemons/ganeti-confd, daemons/ganeti-masterd, 
daemons/ganeti-noded, daemons/ganeti-rapi, daemons/ganeti-watcher
+modules: ganeti, scripts/gnt-*, daemons/ganeti-confd, daemons/ganeti-masterd, 
daemons/ganeti-noded, daemons/ganeti-rapi, daemons/ganeti-watcher, 
daemons/import-export
 
 graph: all
 
diff --git a/lib/constants.py b/lib/constants.py
index 4d0179b..b38a5bd 100644
--- a/lib/constants.py
+++ b/lib/constants.py
@@ -193,6 +193,12 @@ X509_CERT_SIGN_DIGEST = "SHA1"
 
 X509_CERT_SIGNATURE_HEADER = "X-Ganeti-Signature"
 
+IMPORT_EXPORT_DAEMON = _autoconf.PKGLIBDIR + "/import-export"
+
+# Import/export daemon mode
+IEM_IMPORT = "import"
+IEM_EXPORT = "export"
+
 VALUE_DEFAULT = "default"
 VALUE_AUTO = "auto"
 VALUE_GENERATE = "generate"
diff --git a/lib/objects.py b/lib/objects.py
index f03cb9f..20ea674 100644
--- a/lib/objects.py
+++ b/lib/objects.py
@@ -1007,6 +1007,17 @@ class BlockDevStatus(ConfigObject):
     ]
 
 
+class ImportExportStatus(ConfigObject):
+  """Config object representing the status of an import or export."""
+  __slots__ = [
+    "recent_output",
+    "listen_port",
+    "connected",
+    "exit_status",
+    "error_message",
+    ] + _TIMESTAMPS
+
+
 class ConfdRequest(ConfigObject):
   """Object holding a confd request.
 
diff --git a/test/import-export_unittest-helper 
b/test/import-export_unittest-helper
new file mode 100755
index 0000000..3968e1f
--- /dev/null
+++ b/test/import-export_unittest-helper
@@ -0,0 +1,77 @@
+#!/usr/bin/python
+#
+
+# Copyright (C) 2010 Google Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 02110-1301, USA.
+
+
+"""Helpers for testing import-export daemon"""
+
+import os
+import sys
+import errno
+
+from ganeti import constants
+from ganeti import utils
+from ganeti import objects
+from ganeti import serializer
+
+
+RETRY_INTERVAL = 0.1
+TIMEOUT = int(os.getenv("TIMEOUT", 10))
+
+
+def _GetImportExportData(filename):
+  try:
+    data = utils.ReadFile(filename)
+  except EnvironmentError, err:
+    if err.errno != errno.ENOENT:
+      raise
+    raise utils.RetryAgain()
+
+  return objects.ImportExportStatus.FromDict(serializer.LoadJson(data))
+
+
+def _CheckConnected(filename):
+  if not _GetImportExportData(filename).connected:
+    raise utils.RetryAgain()
+
+
+def WaitForListenPort(filename):
+  return utils.Retry(lambda: _GetImportExportData(filename).listen_port,
+                     RETRY_INTERVAL, TIMEOUT)
+
+
+def WaitForConnected(filename):
+  utils.Retry(_CheckConnected, RETRY_INTERVAL, TIMEOUT, args=(filename, ))
+
+
+def main():
+  (filename, what) = sys.argv[1:]
+
+  if what == "listen-port":
+    print WaitForListenPort(filename)
+  elif what == "connected":
+    WaitForConnected(filename)
+  elif what == "gencert":
+    utils.GenerateSelfSignedSslCert(filename, validity=1)
+  else:
+    raise Exception("Unknown command '%s'" % what)
+
+
+if __name__ == "__main__":
+  main()
diff --git a/test/import-export_unittest.bash b/test/import-export_unittest.bash
new file mode 100755
index 0000000..3b992ed
--- /dev/null
+++ b/test/import-export_unittest.bash
@@ -0,0 +1,196 @@
+#!/bin/bash
+#
+
+# Copyright (C) 2010 Google Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 02110-1301, USA.
+
+set -e
+set -o pipefail
+
+export PYTHON=${PYTHON:=python}
+
+impexpd="$PYTHON daemons/import-export"
+
+# Add "-d" for debugging
+#impexpd+=' -d'
+
+err() {
+  echo "$@"
+  echo 'Aborting'
+  exit 1
+}
+
+checkpids() {
+  local result=0
+
+  # Unlike combining the "wait" commands using || or &&, this ensures we
+  # actually wait for all PIDs.
+  for pid in "$@"; do
+    if ! wait $pid; then
+      result=1
+    fi
+  done
+
+  return $result
+}
+
+get_testpath() {
+  echo "${TOP_SRCDIR:-.}/test"
+}
+
+get_testfile() {
+  echo "$(get_testpath)/data/$1"
+}
+
+statusdir=$(mktemp -d)
+trap "rm -rf $statusdir" EXIT
+
+src_statusfile=$statusdir/src.status
+src_x509=$statusdir/src.pem
+
+dst_statusfile=$statusdir/dst.status
+dst_x509=$statusdir/dst.pem
+dst_portfile=$statusdir/dst.port
+
+other_x509=$statusdir/other.pem
+
+testdata=$statusdir/data1
+
+cmd_prefix=
+cmd_suffix=
+
+$impexpd >/dev/null 2>&1 &&
+  err "daemon-util succeeded without parameters"
+
+$impexpd foo bar baz moo boo >/dev/null 2>&1 &&
+  err "daemon-util succeeded with wrong parameters"
+
+$impexpd $src_statusfile >/dev/null 2>&1 &&
+  err "daemon-util succeeded with insufficient parameters"
+
+$impexpd $src_statusfile invalidmode >/dev/null 2>&1 &&
+  err "daemon-util succeeded with invalid mode"
+
+cat $(get_testfile proc_drbd8.txt) $(get_testfile cert1.pem) > $testdata
+
+impexpd_helper() {
+  $PYTHON $(get_testpath)/import-export_unittest-helper "$@"
+}
+
+reset_status() {
+  rm -f $src_statusfile $dst_statusfile $dst_portfile
+}
+
+write_data() {
+  # Wait for connection to be established
+  impexpd_helper $dst_statusfile connected
+
+  cat $testdata
+}
+
+do_export() {
+  # Wait for listening port
+  impexpd_helper $dst_statusfile listen-port > $dst_portfile
+
+  local port=$(< $dst_portfile)
+
+  test -n "$port" || err 'Empty port file'
+
+  do_export_to_port $port
+}
+
+do_export_to_port() {
+  local port=$1
+
+  $impexpd $src_statusfile export --bind=127.0.0.1 \
+    --host=127.0.0.1 --port=$port \
+    --key=$src_x509 --cert=$src_x509 --ca=$dst_x509 \
+    --cmd-prefix="$cmd_prefix" --cmd-suffix="$cmd_suffix"
+}
+
+do_import() {
+  $impexpd $dst_statusfile import --bind=127.0.0.1 \
+    --host=127.0.0.1 \
+    --key=$dst_x509 --cert=$dst_x509 --ca=$src_x509 \
+    --cmd-prefix="$cmd_prefix" --cmd-suffix="$cmd_suffix"
+}
+
+# Generate X509 certificates and keys
+impexpd_helper $src_x509 gencert
+impexpd_helper $dst_x509 gencert
+impexpd_helper $other_x509 gencert
+
+# Normal case
+reset_status
+do_import > $statusdir/recv1 & imppid=$!
+write_data | do_export & exppid=$!
+checkpids $exppid $imppid || err 'An error occurred'
+cmp $testdata $statusdir/recv1 || err 'Received data does not match input'
+
+# Export using wrong CA
+reset_status
+do_import > /dev/null 2>&1 & imppid=$!
+: | dst_x509=$other_x509 do_export 2>/dev/null & exppid=$!
+checkpids $exppid $imppid && err 'Export did not fail when using wrong CA'
+
+# Import using wrong CA
+reset_status
+src_x509=$other_x509 do_import > /dev/null 2>&1 & imppid=$!
+: | do_export 2> /dev/null & exppid=$!
+checkpids $exppid $imppid && err 'Import did not fail when using wrong CA'
+
+# Suffix command on import
+reset_status
+cmd_suffix="| cksum > $statusdir/recv2" do_import & imppid=$!
+write_data | do_export & exppid=$!
+checkpids $exppid $imppid || err 'Testing additional commands failed'
+cmp $statusdir/recv2 <(cksum < $testdata) || \
+  err 'Checksum of received data does not match'
+
+# Prefix command on export
+reset_status
+do_import > $statusdir/recv3 & imppid=$!
+write_data | cmd_prefix="cksum |" do_export & exppid=$!
+checkpids $exppid $imppid || err 'Testing additional commands failed'
+cmp $statusdir/recv3 <(cksum < $testdata) || \
+  err 'Received checksum does not match'
+
+# Failing prefix command on export
+reset_status
+: | cmd_prefix='exit 1;' do_export_to_port 0 & exppid=$!
+checkpids $exppid && err 'Prefix command on export did not fail when it should'
+
+# Failing suffix command on export
+reset_status
+do_import > /dev/null & imppid=$!
+: | cmd_suffix='| exit 1' do_export & exppid=$!
+checkpids $imppid $exppid && \
+  err 'Suffix command on export did not fail when it should'
+
+# Failing prefix command on import
+reset_status
+cmd_prefix='exit 1;' do_import > /dev/null & imppid=$!
+checkpids $imppid && err 'Prefix command on import did not fail when it should'
+
+# Failing suffix command on import
+reset_status
+cmd_suffix='| exit 1' do_import > /dev/null & imppid=$!
+: | do_export & exppid=$!
+checkpids $imppid $exppid && \
+  err 'Suffix command on import did not fail when it should'
+
+exit 0
-- 
1.7.0.4



-- 
To unsubscribe, reply using "remove me" as the subject.

Reply via email to