Hello everyone,

Since I'm not happy with shmux or pssh, I wrote my own "concurrent ssh" program for parallel execution of SSH commands on multiple hosts. Before I release program to the wild, I would like to hear (constructive) comments on what may be wrong with the program and/or how to fix it. (note: the program requires paramiko ssh client module)

#!/usr/local/bin/python -W ignore::DeprecationWarning

import time
import sys
import os
import operator
import paramiko
import threading
import subprocess
import optparse

usage = "Usage: cssh [options] IP1 hostname2 IP3 hostname4 ...\n\n(IPs/hostnames on the commandline are actually optional, they can be specified in the file, see below.)"
op = optparse.OptionParser(usage=usage)

op.add_option('-c','--cmd',dest='cmd',help="""Command to run. Mutually exclusive with -s.""") op.add_option('-s','--script',dest='script',help="""Script file to run. Mutually exclusive with -c. Script can have its own arguments, specify them in doublequotes, like "script -arg arg".""") op.add_option('-i','--script-dir',dest='scriptdir',help="""The directory where script will be copied and executed. Defaults to /tmp.""") op.add_option('-l','--cleanup',dest='cleanup',action='store_true',help="""Delete the script on remote hosts after executing it.""") op.add_option('-f','--file',dest='file',help="""File with hosts to use, one host per line. Concatenated with list of hosts/IP addresses specified at the end of the commandline. Optionally, in a line of the file you can specify sequence: "Address/Hostname Username Password SSH_Port" separated by spaces (additional parameters can be specified on a subset of lines; where not specified, relevant parameters take default values).""") op.add_option('-d','--dir',dest='dir',help='Directory for storing standard output and standard error of command. If specified, directory will be created, with subdirs named IPs/hostnames and relevant files stored in those subdirs.') op.add_option('-u','--username',dest='username',help="""Username to specify for SSH. Defaults to 'root'.""") op.add_option('-p','--password',dest='password',help="""Password. Password is used first; if connection fails using password, cssh uses SSH key (default or specified).""")
op.add_option('-o','--port',dest='port',help="""Default SSH port.""")
op.add_option('-k','--key',dest='key',help="""SSH Key file. Defaults to '/root/.ssh/id_dsa'.""") op.add_option('-n','--nokey',dest='nokey',action="store_true", help="""Turns off using SSH key.""") op.add_option('-t','--timeout',dest='timeout',help="""SSH connection timeout. Defaults to 20 seconds.""") op.add_option('-m','--monochromatic',dest='mono',action='store_true',help="""Do not use colors while printing output.""") op.add_option('-r','--maxthreads',dest='maxthreads',help="""Maximum number of threads working concurrently. Default is 100. Exceeding 200 is generally not recommended due to potential exhaustion of address space (each thread can use 10 MB of address space and 32-bit systems have a maximum of 4GB of address space).""") op.add_option('-q','--quiet',dest='quiet',action='store_true',help="""Quiet. Do not print out summaries like IPs for which communication succeeded or failed, etc.""")

# add resource file?

(opts, args) = op.parse_args()

failit = False

if opts.cmd == None and opts.script == None:
print "You have to specify one of the following: command to run, using -c command or --cmd command, or script to run, using -s scriptfile or --script scriptfile."
    print
    failit = True

if opts.cmd != None and opts.script != None:
print "Options command (-c) and script (-s) are mutually exclusive. Specify either one."
    print
    failit = True

if opts.cmd == None and opts.script != None:
    try:
        scriptpath = opts.script.split()[0]
        scriptfo = open(scriptpath,'r')
        scriptfo.close()
    except IOError:
        print "Could not open script file %s." % opts.script
        print
        failit = True

if opts.file == None and args == []:
    print "You have to specify at least one of the following:"
print " - list of IPs/hostnames at the end of the command line (after all options)" print " - list of IPs/hostnames stored in file specified after -f or --file option (like: -f hostnames.txt)" print " You can also specify both sources. In that case IP/hostname lists will be concatenated."
    print
    failit = True

if opts.password == None and opts.nokey:
print "Since using key has been turned off using -n option, you have to specify password using -p password or --password password."
    print
    failit = True

if opts.key is not None and opts.nokey:
print "Options -n and -k keyfile are mutually exclusive. Specify either one."
    print
    failit = True

if failit:
    sys.exit(0)

if opts.scriptdir == None:
    opts.scriptdir = '/tmp'

if opts.cleanup == None:
    opts.cleanup = False

if opts.key == None:
    opts.key = '/root/.ssh/id_dsa'

if opts.port == None:
    opts.port = 22

if opts.nokey:
    opts.key = None

if opts.timeout == None:
    opts.timeout = 20

if opts.mono == None:
    opts.mono = False

if opts.maxthreads == None:
    opts.maxthreads = 100

if opts.quiet == None:
    opts.quiet = False


HEADER = '\033[95m'
BLACK = '\033[30m'
RED = '\033[31m'
GREEN = '\033[32m'
YELLOW = '\033[33m'
BLUE = '\033[34m'
OKBLUE = '\033[94m'
MAGENTA = '\033[35m'
CYAN = '\033[36m'
WHITE = '\033[37m'
ENDC = '\033[0m'

if opts.mono:
HEADER = BLACK = RED = GREEN = YELLOW = OKBLUE = BLUE = MAGENTA = CYAN = WHITE = ENDC = ''

hosts = args[:]
fhosts = []
fname = opts.file

if fname is not None:
    try:
        fhosts = open(fname).readlines()
    except IOError, e:
        print "Error:", str(e)
        sys.exit(1)

hosts.extend(fhosts)
hosts = [ s.strip() for s in hosts if s != '' and s != None and s != '\n' ]
hosts = [ s.split() for s in hosts ]

if hosts == []:
    print "Error: list of hosts is empty. Quitting"
    sys.exit(1)


class SSHThread(threading.Thread):
def __init__(self, lock, cmd, ip, username, sshprivkey=None, passw=None, port=22, script=None, scriptdir=None):

        threading.Thread.__init__(self)

        self.lock = lock
        self.cmd = cmd
        self.ip = ip
        self.username = username
        self.sshprivkey = sshprivkey
        self.passw = passw
        self.port = port
        self.conobj = None
        self.confailed = True
        if script != None:
            scriptcomp = script.strip().split()
            self.scriptpath = scriptcomp[0]
            self.scriptname = self.scriptpath.split('/')[-1]
            self.scriptargs = script[len(scriptpath):]
            self.scriptdir = scriptdir.strip().rstrip('/')
            self.rspath = self.scriptdir + '/' + self.scriptname
        self.finished = False

    def ping(self, lock, ip):
subp = subprocess.Popen(['/bin/ping', '-c', '1', ip], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        so, se = subp.communicate()
        return (so, se)

    def ssh_connect(self):
        self.conobj = paramiko.SSHClient()
        aap = paramiko.AutoAddPolicy()
        self.conobj.set_missing_host_key_policy(aap)
        loginsuccess = False
        if self.passw is not None:
            try:
self.conobj.connect(self.ip, username=self.username, password=self.passw, port=self.port, timeout=opts.timeout, allow_agent=False, look_for_keys = False)
                loginsuccess = True
            except:
                pass
        if not loginsuccess and self.sshprivkey is not None:
            try:
self.conobj.connect(self.ip, username=self.username, key_filename=self.sshprivkey, port=self.port, timeout=opts.timeout)
                loginsuccess = True
            except:
                pass
        if not loginsuccess:
            self.conobj = None
            self.finished = True

    def execcmds(self):
        so = se = ''
        try:
            si, so, se = self.conobj.exec_command(self.cmd)
            sol = so.readlines()
            sel = se.readlines()
            so = ''.join([ s.replace('\r\n','\n') for s in sol ])
            se = ''.join([ s.replace('\r\n','\n') for s in sel ])
        except:
            pass
        return (so, se)

    def sendscript(self):
        fo = open(self.scriptpath,'rb')
        cnt = ''.join(fo.readlines())
        transport = self.conobj.get_transport()
        channel = transport.open_session()
        destpath = self.scriptdir + '/' + self.scriptname
        try:
            channel.exec_command('scp -t -v %s\n' % destpath)
        except paramiko.SSHException, e:
            channel.close()
            return str(e)
        fl = 'C0755 %d 1\n' % os.path.getsize(self.scriptpath)
        channel.send(fl)
        while not channel.recv_ready():
            time.sleep(0.1)
        try:
            channel.send(cnt)
        except socket.error, e:
            channel.close()
            return str(e)
        channel.close()
        return ''

    def setcmdtoscript(self):
        self.cmd = self.scriptdir + '/' + self.scriptname + self.scriptargs

    def execcmdonscript(self, cmd):
        si, so, se = self.conobj.exec_command(cmd)
        sol = so.readlines()
        sel = se.readlines()
        if sol != [] or sel != []:
            self.lock.acquire()
print RED + "Host %s, Error while executing %s on script:" % (self.ip, cmd), "".join(sel), "".join(sol) + ENDC
            self.lock.release()

    def chmodscript(self):
# just in case, as sometimes and on some operating systems the execution flags on the script are not always set
        self.execcmdonscript('chmod 0755 %s' % self.rspath)

    def delscript(self):
        self.execcmdonscript('rm -f %s' % self.rspath)

    def run(self):
        self.ssh_connect()
        so, se = ('', '')
        res = ''
        if self.conobj != None:
            if self.cmd == None:
                res = self.sendscript()
                self.setcmdtoscript()
                self.chmodscript()
            if res == '':
                so, se = self.execcmds()
            if opts.cleanup:
                time.sleep(0.5)
                self.delscript()
        self.lock.acquire()
        print OKBLUE + "%-20s" % self.ip + ENDC, ":",
        if self.conobj == None:
            print RED + "SSH connection failed" + ENDC
            self.confailed = True
        elif res != '':
            print RED + "Sending script failed: %s" % res + ENDC
            self.confailed = True
        else:
            self.confailed = False
            print OKBLUE + "SSH connection successful" + ENDC
            print so
            if se != '':
                print MAGENTA + "command standard error output:" + ENDC
                print se
            if opts.dir != None:
                if not os.path.isdir(opts.dir):
                    os.mkdir(opts.dir)
                path = opts.dir + os.sep + self.ip
                if not os.path.isdir(path):
                    os.mkdir(path)
                of = open(path + os.sep + 'stdout','w')
                of.write(so)
                of.close()
                of = open(path + os.sep + 'stderr','w')
                of.write(se)
                of.close()
        self.lock.release()
        self.finished = True

    def sshclose(self):
        if self.conobj != None:
            self.conobj.close()


lock = threading.Lock()

queue = []
thfinished = []

def getparams(h):
    ip = h[0]
    try:
        username = h[1]
    except IndexError:
        username = opts.username

    try:
        passw = h[2]
    except IndexError:
        passw = opts.password

    port = None
    try:
        port = int(h[3])
    except IndexError:
        port = 22
    except ValueError, e:
        print RED + "%-20s" % ip, ": error converting port:", str(e) + ENDC
    return (ip, username, passw, port)

while len(hosts) > 0:
    if len(queue) <= opts.maxthreads:
        h = hosts.pop()
        (ip, username, passw, port) = getparams(h)
        if port != None:
th = SSHThread(lock, opts.cmd, ip, username=username, sshprivkey=opts.key, passw=passw, port=port, script=opts.script, scriptdir=opts.scriptdir)
            queue.append((ip, th))
            th.daemon = True
            th.start()
        else:
            thfinished.append((ip,None))
    else:
        time.sleep(1)
    for ip, th in queue:
        if th.finished:
            th.sshclose()
            th.join()
            thfinished.append((ip,th))
            queue.remove((ip,th))


while len(queue) > 0:
    for ip, th in queue:
        if th.finished:
            th.sshclose()
            th.join()
            thfinished.append((ip,th))
            queue.remove((ip,th))
    time.sleep(1)

if not opts.quiet:
    print
print OKBLUE + 'Communication SUCCEEDED for following IP addresses (SSH could open connection):' + ENDC

    for ip, th in thfinished:
        if th != None and not th.confailed:
            print ip

    print
print OKBLUE + 'Communication FAILED for following IP addresses (SSH could not open connection / error in parameters):' + ENDC

    for ip, th in thfinished:
        if th == None or th.confailed:
            print ip





--
http://mail.python.org/mailman/listinfo/python-list

Reply via email to