Zhou Zheng Sheng has uploaded a new change for review. Change subject: storage.misc: Move execCmd and its dependencies to vdsm.utils ......................................................................
storage.misc: Move execCmd and its dependencies to vdsm.utils The original vdsm.utils.execCmd actually imports and invokes storage.misc.execCmd, which means if a process does not have /usr/share/vdsm in its PYTHONPATH and want to use vdsm.utils.execCmd, it will fail. Furthermore, lib/vdsm/utils.py should not use anything that is not present in the python standard library or in a dependency of the vdsm-python package. execCmd is very useful for modules in vdsm-tool, currently these modules use ad-hoc execCmd implementation. This patch moves storage.misc.execCmd and its dependencies to vdsm.utils, so that other modules can import the 'official' execCmd implementation without adding /usr/share/vdsm to PYTHONPATH. Change-Id: Id8fb42035b24bc915aaa700ab128580bdb609d9e Signed-off-by: Zhou Zheng Sheng <[email protected]> --- M lib/vdsm/utils.py M vdsm/storage/misc.py 2 files changed, 314 insertions(+), 314 deletions(-) git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/21/14221/1 diff --git a/lib/vdsm/utils.py b/lib/vdsm/utils.py index e398a7e..cfec8cc 100644 --- a/lib/vdsm/utils.py +++ b/lib/vdsm/utils.py @@ -27,22 +27,35 @@ Contains a reverse dictionary pointing from error string to its error code. """ from SimpleXMLRPCServer import SimpleXMLRPCServer +from StringIO import StringIO +from weakref import proxy import SocketServer -import threading -import os -import time -import logging import errno -import subprocess -import pwd import fcntl import functools -import stat import glob +import io +import logging +import os +import pwd +import select +import signal +import stat +import subprocess +import threading +import time -import constants +from betterPopen import BetterPopen from config import config +import constants + +# Buffsize is 1K because I tested it on some use cases and 1k was fastets. If +# you find this number to be a bottleneck in any way you are welcome to change +# it +BUFFSIZE = 1024 + +SUDO_NON_INTERACTIVE_FLAG = "-n" _THP_STATE_PATH = '/sys/kernel/mm/transparent_hugepage/enabled' if not os.path.exists(_THP_STATE_PATH): @@ -147,10 +160,296 @@ return val -def execCmd(*args, **kwargs): - # import only after config as been initialized - from storage.misc import execCmd - return execCmd(*args, **kwargs) +# NOTE: it would be best to try and unify NoIntrCall and NoIntrPoll. +# We could do so defining a new object that can be used as a placeholer +# for the changing timeout value in the *args/**kwargs. This would +# lead us to rebuilding the function arguments at each loop. +def NoIntrPoll(pollfun, timeout=-1): + """ + This wrapper is used to handle the interrupt exceptions that might + occur during a poll system call. The wrapped function must be defined + as poll([timeout]) where the special timeout value 0 is used to return + immediately and -1 is used to wait indefinitely. + """ + # When the timeout < 0 we shouldn't compute a new timeout after an + # interruption. + endtime = None if timeout < 0 else time.time() + timeout + + while True: + try: + return pollfun(timeout) + except (IOError, select.error) as e: + if e.args[0] != errno.EINTR: + raise + + if endtime is not None: + timeout = max(0, endtime - time.time()) + + +class AsyncProc(object): + """ + AsyncProc is a funky class. It warps a standard subprocess.Popen + Object and gives it super powers. Like the power to read from a stream + without the fear of deadlock. It does this by always sampling all + stream while waiting for data. By doing this the other process can freely + write data to all stream without the fear of it getting stuck writing + to a full pipe. + """ + class _streamWrapper(io.RawIOBase): + def __init__(self, parent, streamToWrap, fd): + io.IOBase.__init__(self) + self._stream = streamToWrap + self._parent = proxy(parent) + self._fd = fd + self._closed = False + self._emptyCounter = 0 + + def close(self): + if not self._closed: + self._closed = True + while not self._streamClosed: + self._parent._processStreams() + + @property + def closed(self): + return self._closed + + @property + def _streamClosed(self): + return (self.fileno() in self._parent._closedfds) + + def fileno(self): + return self._fd + + def seekable(self): + return False + + def readable(self): + return True + + def writable(self): + return True + + def read(self, length): + hasNewData = (self._stream.len - self._stream.pos) + if hasNewData < length and not self._streamClosed: + self._parent._processStreams() + + with self._parent._streamLock: + res = self._stream.read(length) + if self._stream.pos == self._stream.len: + if self._streamClosed and res == "": + self._emptyCounter += 1 + if self._emptyCounter > 2: + self._closed = True + + self._stream.truncate(0) + + return res + + def readinto(self, b): + data = self.read(len(b)) + bytesRead = len(data) + b[:bytesRead] = data + + return bytesRead + + def write(self, data): + if hasattr(data, "tobytes"): + data = data.tobytes() + with self._parent._streamLock: + oldPos = self._stream.pos + self._stream.pos = self._stream.len + self._stream.write(data) + self._stream.pos = oldPos + + while self._stream.len > 0 and not self._streamClosed: + self._parent._processStreams() + + if self._streamClosed: + self._closed = True + + if self._stream.len != 0: + raise IOError(errno.EPIPE, + "Could not write all data to stream") + + return len(data) + + def __init__(self, popenToWrap): + self._streamLock = threading.Lock() + self._proc = popenToWrap + + self._stdout = StringIO() + self._stderr = StringIO() + self._stdin = StringIO() + + fdout = self._proc.stdout.fileno() + fderr = self._proc.stderr.fileno() + self._fdin = self._proc.stdin.fileno() + + self._closedfds = [] + + self._poller = select.epoll() + self._poller.register(fdout, select.EPOLLIN | select.EPOLLPRI) + self._poller.register(fderr, select.EPOLLIN | select.EPOLLPRI) + self._poller.register(self._fdin, 0) + self._fdMap = {fdout: self._stdout, + fderr: self._stderr, + self._fdin: self._stdin} + + self.stdout = io.BufferedReader(self._streamWrapper(self, + self._stdout, fdout), BUFFSIZE) + + self.stderr = io.BufferedReader(self._streamWrapper(self, + self._stderr, fderr), BUFFSIZE) + + self.stdin = io.BufferedWriter(self._streamWrapper(self, + self._stdin, self._fdin), BUFFSIZE) + + self._returncode = None + + def _processStreams(self): + if len(self._closedfds) == 3: + return + + if not self._streamLock.acquire(False): + self._streamLock.acquire() + self._streamLock.release() + return + try: + if self._stdin.len > 0 and self._stdin.pos == 0: + # Polling stdin is redundant if there is nothing to write + # trun on only if data is waiting to be pushed + self._poller.modify(self._fdin, select.EPOLLOUT) + + pollres = NoIntrPoll(self._poller.poll, 1) + + for fd, event in pollres: + stream = self._fdMap[fd] + if event & select.EPOLLOUT and self._stdin.len > 0: + buff = self._stdin.read(BUFFSIZE) + written = os.write(fd, buff) + stream.pos -= len(buff) - written + if stream.pos == stream.len: + stream.truncate(0) + self._poller.modify(fd, 0) + + elif event & (select.EPOLLIN | select.EPOLLPRI): + data = os.read(fd, BUFFSIZE) + oldpos = stream.pos + stream.pos = stream.len + stream.write(data) + stream.pos = oldpos + + elif event & (select.EPOLLHUP | select.EPOLLERR): + self._poller.unregister(fd) + self._closedfds.append(fd) + # I don't close the fd because the original Popen + # will do it. + + if self.stdin.closed and self._fdin not in self._closedfds: + self._poller.unregister(self._fdin) + self._closedfds.append(self._fdin) + self._proc.stdin.close() + + finally: + self._streamLock.release() + + @property + def pid(self): + return self._proc.pid + + @property + def returncode(self): + if self._returncode is None: + self._returncode = self._proc.poll() + return self._returncode + + def kill(self): + try: + self._proc.kill() + except OSError as ex: + if ex.errno != errno.EPERM: + raise + execCmd([constants.EXT_KILL, "-%d" % (signal.SIGTERM,), + str(self.pid)], sudo=True) + + def wait(self, timeout=None, cond=None): + startTime = time.time() + while self.returncode is None: + if timeout is not None and (time.time() - startTime) > timeout: + return False + if cond is not None and cond(): + return False + self._processStreams() + return True + + def communicate(self, data=None): + if data is not None: + self.stdin.write(data) + self.stdin.flush() + self.stdin.close() + + self.wait() + return "".join(self.stdout), "".join(self.stderr) + + def __del__(self): + self._poller.close() + + +def execCmd(command, sudo=False, cwd=None, data=None, raw=False, logErr=True, + printable=None, env=None, sync=True, nice=None, ioclass=None, + ioclassdata=None, setsid=False, execCmdLogger=logging.root): + """ + Executes an external command, optionally via sudo. + """ + if ioclass is not None: + cmd = command + command = [constants.EXT_IONICE, '-c', str(ioclass)] + if ioclassdata is not None: + command.extend(("-n", str(ioclassdata))) + + command = command + cmd + + if nice is not None: + command = [constants.EXT_NICE, '-n', str(nice)] + command + + if setsid: + command = [constants.EXT_SETSID] + command + + if sudo: + command = [constants.EXT_SUDO, SUDO_NON_INTERACTIVE_FLAG] + command + + if not printable: + printable = command + + cmdline = repr(subprocess.list2cmdline(printable)) + execCmdLogger.debug("%s (cwd %s)", cmdline, cwd) + + p = BetterPopen(command, close_fds=True, cwd=cwd, env=env) + p = AsyncProc(p) + if not sync: + if data is not None: + p.stdin.write(data) + p.stdin.flush() + + return p + + (out, err) = p.communicate(data) + + if out is None: + # Prevent splitlines() from barfing later on + out = "" + + execCmdLogger.debug("%s: <err> = %s; <rc> = %d", + {True: "SUCCESS", False: "FAILED"}[p.returncode == 0], + repr(err), p.returncode) + + if not raw: + out = out.splitlines(False) + err = err.splitlines(False) + + return (p.returncode, out, err) def checkPathStat(pathToCheck): diff --git a/vdsm/storage/misc.py b/vdsm/storage/misc.py index ae92b75..bee9daa 100644 --- a/vdsm/storage/misc.py +++ b/vdsm/storage/misc.py @@ -31,25 +31,19 @@ from contextlib import contextmanager from functools import wraps, partial from itertools import chain, imap -from StringIO import StringIO -from weakref import proxy import contextlib import errno import glob -import io import logging import os import Queue import random import re import select -import signal import string import struct -import subprocess import sys import threading -import time import types import weakref import fcntl @@ -58,7 +52,6 @@ from vdsm import constants from vdsm import utils import storage_exception as se -from betterPopen import BetterPopen import fileUtils import logUtils @@ -70,7 +63,6 @@ UUID_HYPHENS = [8, 13, 18, 23] OVIRT_NODE = False MEGA = 1 << 20 -SUDO_NON_INTERACTIVE_FLAG = "-n" UNLIMITED_THREADS = -1 log = logging.getLogger('Storage.Misc') @@ -86,11 +78,6 @@ logger, logger.__class__) return logger - -# Buffsize is 1K because I tested it on some use cases and 1k was fastets. If -# you find this number to be a bottleneck in any way you are welcome to change -# it -BUFFSIZE = 1024 def stripNewLines(lines): @@ -193,60 +180,8 @@ LOW = 19 -@logskip("Storage.Misc.excCmd") -def execCmd(command, sudo=False, cwd=None, data=None, raw=False, logErr=True, - printable=None, env=None, sync=True, nice=None, ioclass=None, - ioclassdata=None, setsid=False): - """ - Executes an external command, optionally via sudo. - """ - if ioclass is not None: - cmd = command - command = [constants.EXT_IONICE, '-c', str(ioclass)] - if ioclassdata is not None: - command.extend(("-n", str(ioclassdata))) - - command = command + cmd - - if nice is not None: - command = [constants.EXT_NICE, '-n', str(nice)] + command - - if setsid: - command = [constants.EXT_SETSID] + command - - if sudo: - command = [constants.EXT_SUDO, SUDO_NON_INTERACTIVE_FLAG] + command - - if not printable: - printable = command - - cmdline = repr(subprocess.list2cmdline(printable)) - execCmdLogger.debug("%s (cwd %s)", cmdline, cwd) - - p = BetterPopen(command, close_fds=True, cwd=cwd, env=env) - p = AsyncProc(p) - if not sync: - if data is not None: - p.stdin.write(data) - p.stdin.flush() - - return p - - (out, err) = p.communicate(data) - - if out is None: - # Prevent splitlines() from barfing later on - out = "" - - execCmdLogger.debug("%s: <err> = %s; <rc> = %d", - {True: "SUCCESS", False: "FAILED"}[p.returncode == 0], - repr(err), p.returncode) - - if not raw: - out = out.splitlines(False) - err = err.splitlines(False) - - return (p.returncode, out, err) +execCmd = partial(logskip("Storage.Misc.excCmd")(utils.execCmd), + execCmdLogger=execCmdLogger) def pidExists(pid): @@ -786,217 +721,6 @@ self._finally.insert(0, (func, args, kwargs)) -class AsyncProc(object): - """ - AsyncProc is a funky class. It warps a standard subprocess.Popen - Object and gives it super powers. Like the power to read from a stream - without the fear of deadlock. It does this by always sampling all - stream while waiting for data. By doing this the other process can freely - write data to all stream without the fear of it getting stuck writing - to a full pipe. - """ - class _streamWrapper(io.RawIOBase): - def __init__(self, parent, streamToWrap, fd): - io.IOBase.__init__(self) - self._stream = streamToWrap - self._parent = proxy(parent) - self._fd = fd - self._closed = False - self._emptyCounter = 0 - - def close(self): - if not self._closed: - self._closed = True - while not self._streamClosed: - self._parent._processStreams() - - @property - def closed(self): - return self._closed - - @property - def _streamClosed(self): - return (self.fileno() in self._parent._closedfds) - - def fileno(self): - return self._fd - - def seekable(self): - return False - - def readable(self): - return True - - def writable(self): - return True - - def read(self, length): - hasNewData = (self._stream.len - self._stream.pos) - if hasNewData < length and not self._streamClosed: - self._parent._processStreams() - - with self._parent._streamLock: - res = self._stream.read(length) - if self._stream.pos == self._stream.len: - if self._streamClosed and res == "": - self._emptyCounter += 1 - if self._emptyCounter > 2: - self._closed = True - - self._stream.truncate(0) - - return res - - def readinto(self, b): - data = self.read(len(b)) - bytesRead = len(data) - b[:bytesRead] = data - - return bytesRead - - def write(self, data): - if hasattr(data, "tobytes"): - data = data.tobytes() - with self._parent._streamLock: - oldPos = self._stream.pos - self._stream.pos = self._stream.len - self._stream.write(data) - self._stream.pos = oldPos - - while self._stream.len > 0 and not self._streamClosed: - self._parent._processStreams() - - if self._streamClosed: - self._closed = True - - if self._stream.len != 0: - raise IOError(errno.EPIPE, - "Could not write all data to stream") - - return len(data) - - def __init__(self, popenToWrap): - self._streamLock = threading.Lock() - self._proc = popenToWrap - - self._stdout = StringIO() - self._stderr = StringIO() - self._stdin = StringIO() - - fdout = self._proc.stdout.fileno() - fderr = self._proc.stderr.fileno() - self._fdin = self._proc.stdin.fileno() - - self._closedfds = [] - - self._poller = select.epoll() - self._poller.register(fdout, select.EPOLLIN | select.EPOLLPRI) - self._poller.register(fderr, select.EPOLLIN | select.EPOLLPRI) - self._poller.register(self._fdin, 0) - self._fdMap = {fdout: self._stdout, - fderr: self._stderr, - self._fdin: self._stdin} - - self.stdout = io.BufferedReader(self._streamWrapper(self, - self._stdout, fdout), BUFFSIZE) - - self.stderr = io.BufferedReader(self._streamWrapper(self, - self._stderr, fderr), BUFFSIZE) - - self.stdin = io.BufferedWriter(self._streamWrapper(self, - self._stdin, self._fdin), BUFFSIZE) - - self._returncode = None - - def _processStreams(self): - if len(self._closedfds) == 3: - return - - if not self._streamLock.acquire(False): - self._streamLock.acquire() - self._streamLock.release() - return - try: - if self._stdin.len > 0 and self._stdin.pos == 0: - # Polling stdin is redundant if there is nothing to write - # trun on only if data is waiting to be pushed - self._poller.modify(self._fdin, select.EPOLLOUT) - - pollres = NoIntrPoll(self._poller.poll, 1) - - for fd, event in pollres: - stream = self._fdMap[fd] - if event & select.EPOLLOUT and self._stdin.len > 0: - buff = self._stdin.read(BUFFSIZE) - written = os.write(fd, buff) - stream.pos -= len(buff) - written - if stream.pos == stream.len: - stream.truncate(0) - self._poller.modify(fd, 0) - - elif event & (select.EPOLLIN | select.EPOLLPRI): - data = os.read(fd, BUFFSIZE) - oldpos = stream.pos - stream.pos = stream.len - stream.write(data) - stream.pos = oldpos - - elif event & (select.EPOLLHUP | select.EPOLLERR): - self._poller.unregister(fd) - self._closedfds.append(fd) - # I don't close the fd because the original Popen - # will do it. - - if self.stdin.closed and self._fdin not in self._closedfds: - self._poller.unregister(self._fdin) - self._closedfds.append(self._fdin) - self._proc.stdin.close() - - finally: - self._streamLock.release() - - @property - def pid(self): - return self._proc.pid - - @property - def returncode(self): - if self._returncode is None: - self._returncode = self._proc.poll() - return self._returncode - - def kill(self): - try: - self._proc.kill() - except OSError as ex: - if ex.errno != errno.EPERM: - raise - execCmd([constants.EXT_KILL, "-%d" % (signal.SIGTERM,), - str(self.pid)], sudo=True) - - def wait(self, timeout=None, cond=None): - startTime = time.time() - while self.returncode is None: - if timeout is not None and (time.time() - startTime) > timeout: - return False - if cond is not None and cond(): - return False - self._processStreams() - return True - - def communicate(self, data=None): - if data is not None: - self.stdin.write(data) - self.stdin.flush() - self.stdin.close() - - self.wait() - return "".join(self.stdout), "".join(self.stderr) - - def __del__(self): - self._poller.close() - - class DynamicBarrier(object): def __init__(self): self._lock = threading.Lock() @@ -1374,30 +1098,7 @@ break -# NOTE: it would be best to try and unify NoIntrCall and NoIntrPoll. -# We could do so defining a new object that can be used as a placeholer -# for the changing timeout value in the *args/**kwargs. This would -# lead us to rebuilding the function arguments at each loop. -def NoIntrPoll(pollfun, timeout=-1): - """ - This wrapper is used to handle the interrupt exceptions that might - occur during a poll system call. The wrapped function must be defined - as poll([timeout]) where the special timeout value 0 is used to return - immediately and -1 is used to wait indefinitely. - """ - # When the timeout < 0 we shouldn't compute a new timeout after an - # interruption. - endtime = None if timeout < 0 else time.time() + timeout - - while True: - try: - return pollfun(timeout) - except (IOError, select.error) as e: - if e.args[0] != errno.EINTR: - raise - - if endtime is not None: - timeout = max(0, endtime - time.time()) +NoIntrPoll = utils.NoIntrPoll def isAscii(s): -- To view, visit http://gerrit.ovirt.org/14221 To unsubscribe, visit http://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Id8fb42035b24bc915aaa700ab128580bdb609d9e Gerrit-PatchSet: 1 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Zhou Zheng Sheng <[email protected]> _______________________________________________ vdsm-patches mailing list [email protected] https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches
