changeset 4358cf55befc in /home/hg/repos/gajim

details:http://hg.gajim.org/gajim?cmd=changeset;node=4358cf55befc
description: update python-gnupg to 0.3.0 + add KEYEXPIRED support. Fixes #7151

diffstat:

 src/common/gnupg.py |  1190 ++++++++++++++++++++++++++------------------------
 src/common/gpg.py   |     4 +-
 2 files changed, 622 insertions(+), 572 deletions(-)

diffs (truncated from 1364 to 300 lines):

diff -r 1c957ee45b0e -r 4358cf55befc src/common/gnupg.py
--- a/src/common/gnupg.py       Wed May 16 17:23:34 2012 +0400
+++ b/src/common/gnupg.py       Fri May 18 16:58:51 2012 +0200
@@ -27,25 +27,22 @@
 and so does not work on Windows). Renamed to gnupg.py to avoid confusion with
 the previous versions.
 
-Modifications Copyright (C) 2008-2011 Vinay Sajip. All rights reserved.
+Modifications Copyright (C) 2008-2012 Vinay Sajip. All rights reserved.
 
 A unittest harness (test_gnupg.py) has also been added.
 """
 import locale
 
+__version__ = "0.3.0"
 __author__ = "Vinay Sajip"
-__date__  = "$25-Jan-2011 11:40:48$"
+__date__  = "$12-May-2012 10:49:10$"
 
 try:
     from io import StringIO
-    from io import TextIOWrapper
-    from io import BufferedReader
-    from io import BufferedWriter
 except ImportError:
     from cStringIO import StringIO
-    class BufferedReader: pass
-    class BufferedWriter: pass
 
+import codecs
 import locale
 import logging
 import os
@@ -110,34 +107,11 @@
     passphrase = '%s\n' % passphrase
     passphrase = passphrase.encode(encoding)
     stream.write(passphrase)
-    logger.debug("Passphrase written")
+    logger.debug("Wrote passphrase: %r", passphrase)
 
 def _is_sequence(instance):
     return isinstance(instance,list) or isinstance(instance,tuple)
 
-def _wrap_input(inp):
-    if isinstance(inp, BufferedWriter):
-        oldinp = inp
-        inp = TextIOWrapper(inp)
-        logger.debug('wrapped input: %r -> %r', oldinp, inp)
-    return inp
-
-def _wrap_output(outp):
-    if isinstance(outp, BufferedReader):
-        oldoutp = outp
-        outp = TextIOWrapper(outp)
-        logger.debug('wrapped output: %r -> %r', oldoutp, outp)
-    return outp
-
-#The following is needed for Python2.7 :-(
-def _make_file(s):
-    try:
-        rv = StringIO(s)
-    except (TypeError, UnicodeError):
-        from io import BytesIO
-        rv = BytesIO(s)
-    return rv
-
 def _make_binary_stream(s, encoding):
     try:
         if _py3k:
@@ -152,526 +126,11 @@
         rv = StringIO(s)
     return rv
 
-class GPG(object):
-    "Encapsulate access to the gpg executable"
-    def __init__(self, gpgbinary='gpg', gnupghome=None, verbose=False, 
use_agent=False):
-        """Initialize a GPG process wrapper.  Options are:
-
-        gpgbinary -- full pathname for GPG binary.
-
-        gnupghome -- full pathname to where we can find the public and
-        private keyrings.  Default is whatever gpg defaults to.
-        """
-        self.gpgbinary = gpgbinary
-        self.gnupghome = gnupghome
-        self.verbose = verbose
-        self.use_agent = use_agent
-        self.encoding = locale.getpreferredencoding()
-        if self.encoding is None: # This happens on Jython!
-            self.encoding = sys.stdin.encoding
-        if gnupghome and not os.path.isdir(self.gnupghome):
-            os.makedirs(self.gnupghome,0x1C0)
-        p = self._open_subprocess(["--version"])
-        result = Verify() # any result will do for this
-        self._collect_output(p, result, stdin=p.stdin)
-        if p.returncode != 0:
-            raise ValueError("Error invoking gpg: %s: %s" % (p.returncode,
-                                                             result.stderr))
-
-    def _open_subprocess(self, args, passphrase=False):
-        # Internal method: open a pipe to a GPG subprocess and return
-        # the file objects for communicating with it.
-        cmd = [self.gpgbinary, '--status-fd 2 --no-tty']
-        if self.gnupghome:
-            cmd.append('--homedir "%s" ' % self.gnupghome)
-        if passphrase:
-            cmd.append('--batch --passphrase-fd 0')
-        if self.use_agent:
-            cmd.append('--use-agent')
-        cmd.extend(args)
-        cmd = ' '.join(cmd)
-        if self.verbose:
-            print(cmd)
-        logger.debug("%s", cmd)
-        return Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE)
-
-    def _read_response(self, stream, result):
-        # Internal method: reads all the output from GPG, taking notice
-        # only of lines that begin with the magic [GNUPG:] prefix.
-        #
-        # Calls methods on the response object for each valid token found,
-        # with the arg being the remainder of the status line.
-        lines = []
-        while True:
-            line = stream.readline()
-            lines.append(line)
-            if self.verbose:
-                print(line)
-            logger.debug("%s", line.rstrip())
-            if line == "": break
-            line = line.rstrip()
-            if line[0:9] == '[GNUPG:] ':
-                # Chop off the prefix
-                line = line[9:]
-                L = line.split(None, 1)
-                keyword = L[0]
-                if len(L) > 1:
-                    value = L[1]
-                else:
-                    value = ""
-                result.handle_status(keyword, value)
-        result.stderr = ''.join(lines)
-
-    def _read_data(self, stream, result):
-        # Read the contents of the file from GPG's stdout
-        chunks = []
-        while True:
-            data = stream.read(1024)
-            if len(data) == 0:
-                break
-            logger.debug("chunk: %r" % data[:256])
-            chunks.append(data)
-        if _py3k:
-            # Join using b'' or '', as appropriate
-            result.data = type(data)().join(chunks)
-        else:
-            result.data = ''.join(chunks)
-
-    def _collect_output(self, process, result, writer=None, stdin=None):
-        """
-        Drain the subprocesses output streams, writing the collected output
-        to the result. If a writer thread (writing to the subprocess) is given,
-        make sure it's joined before returning. If a stdin stream is given,
-        close it before returning.
-        """
-        stderr = _wrap_output(process.stderr)
-        rr = threading.Thread(target=self._read_response, args=(stderr, 
result))
-        rr.setDaemon(True)
-        logger.debug('stderr reader: %r', rr)
-        rr.start()
-
-        stdout = process.stdout # _wrap_output(process.stdout)
-        dr = threading.Thread(target=self._read_data, args=(stdout, result))
-        dr.setDaemon(True)
-        logger.debug('stdout reader: %r', dr)
-        dr.start()
-
-        dr.join()
-        rr.join()
-        if writer is not None:
-            writer.join()
-        process.wait()
-        if stdin is not None:
-            try:
-                stdin.close()
-            except IOError:
-                pass
-        stderr.close()
-        stdout.close()
-
-    def _handle_io(self, args, file, result, passphrase=None, binary=False):
-        "Handle a call to GPG - pass input data, collect output data"
-        # Handle a basic data call - pass data to GPG, handle the output
-        # including status information. Garbage In, Garbage Out :)
-        p = self._open_subprocess(args, passphrase is not None)
-        if not binary and not isinstance(file, BufferedReader):
-            stdin = _wrap_input(p.stdin)
-        else:
-            stdin = p.stdin
-        if passphrase:
-            _write_passphrase(stdin, passphrase, self.encoding)
-        writer = _threaded_copy_data(file, stdin)
-        self._collect_output(p, result, writer, stdin)
-        return result
-
-    #
-    # SIGNATURE METHODS
-    #
-    def sign(self, message, **kwargs):
-        """sign message"""
-        f = _make_binary_stream(message, self.encoding)
-        result = self.sign_file(f, **kwargs)
-        f.close()
-        return result
-
-    def sign_file(self, file, keyid=None, passphrase=None, clearsign=True,
-                  detach=False, binary=False):
-        """sign file"""
-        logger.debug("sign_file: %s", file)
-        if binary:
-            args = ['-s']
-        else:
-            args = ['-sa']
-        # You can't specify detach-sign and clearsign together: gpg ignores
-        # the detach-sign in that case.
-        if detach:
-            args.append("--detach-sign")
-        elif clearsign:
-            args.append("--clearsign")
-        if keyid:
-            args.append("--default-key %s" % keyid)
-        result = Sign(self.encoding)
-        #We could use _handle_io here except for the fact that if the
-        #passphrase is bad, gpg bails and you can't write the message.
-        #self._handle_io(args, _make_file(message), result, 
passphrase=passphrase)
-        p = self._open_subprocess(args, passphrase is not None)
-        try:
-            stdin = p.stdin
-            if passphrase:
-                _write_passphrase(stdin, passphrase, self.encoding)
-            writer = _threaded_copy_data(file, stdin)
-        except IOError:
-            logging.exception("error writing message")
-            writer = None
-        self._collect_output(p, result, writer, stdin)
-        return result
-
-    def verify(self, data):
-        """Verify the signature on the contents of the string 'data'
-
-        >>> gpg = GPG(gnupghome="keys")
-        >>> input = gpg.gen_key_input(Passphrase='foo')
-        >>> key = gpg.gen_key(input)
-        >>> assert key
-        >>> sig = gpg.sign('hello',keyid=key.fingerprint,passphrase='bar')
-        >>> assert not sig
-        >>> sig = gpg.sign('hello',keyid=key.fingerprint,passphrase='foo')
-        >>> assert sig
-        >>> verify = gpg.verify(sig.data)
-        >>> assert verify
-
-        """
-        f = _make_binary_stream(data, self.encoding)
-        result = self.verify_file(f)
-        f.close()
-        return result
-
-    def verify_file(self, file, data_filename=None):
-        "Verify the signature on the contents of the file-like object 'file'"
-        logger.debug('verify_file: %r, %r', file, data_filename)
-        result = Verify()
-        args = ['--verify']
-        if data_filename is None:
-            self._handle_io(args, file, result, binary=True)
-        else:
-            logger.debug('Handling detached verification')
-            import tempfile
-            fd, fn = tempfile.mkstemp(prefix='pygpg')
-            s = file.read()
-            file.close()
-            logger.debug('Wrote to temp file: %r', s)
-            os.write(fd, s)
-            os.close(fd)
-            args.append(fn)
-            args.append(data_filename)
-            try:
-                p = self._open_subprocess(args)
-                self._collect_output(p, result, stdin=p.stdin)
-            finally:
-                os.unlink(fn)
-        return result
-
-    #
-    # KEY MANAGEMENT
-    #
-
-    def import_keys(self, key_data):
-        """ import the key_data into our keyring
-
-        >>> import shutil
_______________________________________________
Commits mailing list
[email protected]
http://lists.gajim.org/cgi-bin/listinfo/commits

Reply via email to