changeset 4a0e9afc56f3 in /home/hg/repos/gajim
details:http://hg.gajim.org/gajim?cmd=changeset;node=4a0e9afc56f3
description: update python-gnupg to 0.3.0 + add KEYEXPIRED support. Fixes #7151
diffstat:
src/common/gnupg.py | 1190 ++++++++++++++++++++++++++------------------------
src/common/gpg.py | 4 +-
2 files changed, 622 insertions(+), 572 deletions(-)
diffs (truncated from 1364 to 300 lines):
diff -r 3997ab204178 -r 4a0e9afc56f3 src/common/gnupg.py
--- a/src/common/gnupg.py Wed May 09 12:16:48 2012 +0400
+++ b/src/common/gnupg.py Fri May 18 16:58:51 2012 +0200
@@ -27,25 +27,22 @@
and so does not work on Windows). Renamed to gnupg.py to avoid confusion with
the previous versions.
-Modifications Copyright (C) 2008-2011 Vinay Sajip. All rights reserved.
+Modifications Copyright (C) 2008-2012 Vinay Sajip. All rights reserved.
A unittest harness (test_gnupg.py) has also been added.
"""
import locale
+__version__ = "0.3.0"
__author__ = "Vinay Sajip"
-__date__ = "$25-Jan-2011 11:40:48$"
+__date__ = "$12-May-2012 10:49:10$"
try:
from io import StringIO
- from io import TextIOWrapper
- from io import BufferedReader
- from io import BufferedWriter
except ImportError:
from cStringIO import StringIO
- class BufferedReader: pass
- class BufferedWriter: pass
+import codecs
import locale
import logging
import os
@@ -110,34 +107,11 @@
passphrase = '%s\n' % passphrase
passphrase = passphrase.encode(encoding)
stream.write(passphrase)
- logger.debug("Passphrase written")
+ logger.debug("Wrote passphrase: %r", passphrase)
def _is_sequence(instance):
return isinstance(instance,list) or isinstance(instance,tuple)
-def _wrap_input(inp):
- if isinstance(inp, BufferedWriter):
- oldinp = inp
- inp = TextIOWrapper(inp)
- logger.debug('wrapped input: %r -> %r', oldinp, inp)
- return inp
-
-def _wrap_output(outp):
- if isinstance(outp, BufferedReader):
- oldoutp = outp
- outp = TextIOWrapper(outp)
- logger.debug('wrapped output: %r -> %r', oldoutp, outp)
- return outp
-
-#The following is needed for Python2.7 :-(
-def _make_file(s):
- try:
- rv = StringIO(s)
- except (TypeError, UnicodeError):
- from io import BytesIO
- rv = BytesIO(s)
- return rv
-
def _make_binary_stream(s, encoding):
try:
if _py3k:
@@ -152,526 +126,11 @@
rv = StringIO(s)
return rv
-class GPG(object):
- "Encapsulate access to the gpg executable"
- def __init__(self, gpgbinary='gpg', gnupghome=None, verbose=False,
use_agent=False):
- """Initialize a GPG process wrapper. Options are:
-
- gpgbinary -- full pathname for GPG binary.
-
- gnupghome -- full pathname to where we can find the public and
- private keyrings. Default is whatever gpg defaults to.
- """
- self.gpgbinary = gpgbinary
- self.gnupghome = gnupghome
- self.verbose = verbose
- self.use_agent = use_agent
- self.encoding = locale.getpreferredencoding()
- if self.encoding is None: # This happens on Jython!
- self.encoding = sys.stdin.encoding
- if gnupghome and not os.path.isdir(self.gnupghome):
- os.makedirs(self.gnupghome,0x1C0)
- p = self._open_subprocess(["--version"])
- result = Verify() # any result will do for this
- self._collect_output(p, result, stdin=p.stdin)
- if p.returncode != 0:
- raise ValueError("Error invoking gpg: %s: %s" % (p.returncode,
- result.stderr))
-
- def _open_subprocess(self, args, passphrase=False):
- # Internal method: open a pipe to a GPG subprocess and return
- # the file objects for communicating with it.
- cmd = [self.gpgbinary, '--status-fd 2 --no-tty']
- if self.gnupghome:
- cmd.append('--homedir "%s" ' % self.gnupghome)
- if passphrase:
- cmd.append('--batch --passphrase-fd 0')
- if self.use_agent:
- cmd.append('--use-agent')
- cmd.extend(args)
- cmd = ' '.join(cmd)
- if self.verbose:
- print(cmd)
- logger.debug("%s", cmd)
- return Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE)
-
- def _read_response(self, stream, result):
- # Internal method: reads all the output from GPG, taking notice
- # only of lines that begin with the magic [GNUPG:] prefix.
- #
- # Calls methods on the response object for each valid token found,
- # with the arg being the remainder of the status line.
- lines = []
- while True:
- line = stream.readline()
- lines.append(line)
- if self.verbose:
- print(line)
- logger.debug("%s", line.rstrip())
- if line == "": break
- line = line.rstrip()
- if line[0:9] == '[GNUPG:] ':
- # Chop off the prefix
- line = line[9:]
- L = line.split(None, 1)
- keyword = L[0]
- if len(L) > 1:
- value = L[1]
- else:
- value = ""
- result.handle_status(keyword, value)
- result.stderr = ''.join(lines)
-
- def _read_data(self, stream, result):
- # Read the contents of the file from GPG's stdout
- chunks = []
- while True:
- data = stream.read(1024)
- if len(data) == 0:
- break
- logger.debug("chunk: %r" % data[:256])
- chunks.append(data)
- if _py3k:
- # Join using b'' or '', as appropriate
- result.data = type(data)().join(chunks)
- else:
- result.data = ''.join(chunks)
-
- def _collect_output(self, process, result, writer=None, stdin=None):
- """
- Drain the subprocesses output streams, writing the collected output
- to the result. If a writer thread (writing to the subprocess) is given,
- make sure it's joined before returning. If a stdin stream is given,
- close it before returning.
- """
- stderr = _wrap_output(process.stderr)
- rr = threading.Thread(target=self._read_response, args=(stderr,
result))
- rr.setDaemon(True)
- logger.debug('stderr reader: %r', rr)
- rr.start()
-
- stdout = process.stdout # _wrap_output(process.stdout)
- dr = threading.Thread(target=self._read_data, args=(stdout, result))
- dr.setDaemon(True)
- logger.debug('stdout reader: %r', dr)
- dr.start()
-
- dr.join()
- rr.join()
- if writer is not None:
- writer.join()
- process.wait()
- if stdin is not None:
- try:
- stdin.close()
- except IOError:
- pass
- stderr.close()
- stdout.close()
-
- def _handle_io(self, args, file, result, passphrase=None, binary=False):
- "Handle a call to GPG - pass input data, collect output data"
- # Handle a basic data call - pass data to GPG, handle the output
- # including status information. Garbage In, Garbage Out :)
- p = self._open_subprocess(args, passphrase is not None)
- if not binary and not isinstance(file, BufferedReader):
- stdin = _wrap_input(p.stdin)
- else:
- stdin = p.stdin
- if passphrase:
- _write_passphrase(stdin, passphrase, self.encoding)
- writer = _threaded_copy_data(file, stdin)
- self._collect_output(p, result, writer, stdin)
- return result
-
- #
- # SIGNATURE METHODS
- #
- def sign(self, message, **kwargs):
- """sign message"""
- f = _make_binary_stream(message, self.encoding)
- result = self.sign_file(f, **kwargs)
- f.close()
- return result
-
- def sign_file(self, file, keyid=None, passphrase=None, clearsign=True,
- detach=False, binary=False):
- """sign file"""
- logger.debug("sign_file: %s", file)
- if binary:
- args = ['-s']
- else:
- args = ['-sa']
- # You can't specify detach-sign and clearsign together: gpg ignores
- # the detach-sign in that case.
- if detach:
- args.append("--detach-sign")
- elif clearsign:
- args.append("--clearsign")
- if keyid:
- args.append("--default-key %s" % keyid)
- result = Sign(self.encoding)
- #We could use _handle_io here except for the fact that if the
- #passphrase is bad, gpg bails and you can't write the message.
- #self._handle_io(args, _make_file(message), result,
passphrase=passphrase)
- p = self._open_subprocess(args, passphrase is not None)
- try:
- stdin = p.stdin
- if passphrase:
- _write_passphrase(stdin, passphrase, self.encoding)
- writer = _threaded_copy_data(file, stdin)
- except IOError:
- logging.exception("error writing message")
- writer = None
- self._collect_output(p, result, writer, stdin)
- return result
-
- def verify(self, data):
- """Verify the signature on the contents of the string 'data'
-
- >>> gpg = GPG(gnupghome="keys")
- >>> input = gpg.gen_key_input(Passphrase='foo')
- >>> key = gpg.gen_key(input)
- >>> assert key
- >>> sig = gpg.sign('hello',keyid=key.fingerprint,passphrase='bar')
- >>> assert not sig
- >>> sig = gpg.sign('hello',keyid=key.fingerprint,passphrase='foo')
- >>> assert sig
- >>> verify = gpg.verify(sig.data)
- >>> assert verify
-
- """
- f = _make_binary_stream(data, self.encoding)
- result = self.verify_file(f)
- f.close()
- return result
-
- def verify_file(self, file, data_filename=None):
- "Verify the signature on the contents of the file-like object 'file'"
- logger.debug('verify_file: %r, %r', file, data_filename)
- result = Verify()
- args = ['--verify']
- if data_filename is None:
- self._handle_io(args, file, result, binary=True)
- else:
- logger.debug('Handling detached verification')
- import tempfile
- fd, fn = tempfile.mkstemp(prefix='pygpg')
- s = file.read()
- file.close()
- logger.debug('Wrote to temp file: %r', s)
- os.write(fd, s)
- os.close(fd)
- args.append(fn)
- args.append(data_filename)
- try:
- p = self._open_subprocess(args)
- self._collect_output(p, result, stdin=p.stdin)
- finally:
- os.unlink(fn)
- return result
-
- #
- # KEY MANAGEMENT
- #
-
- def import_keys(self, key_data):
- """ import the key_data into our keyring
-
- >>> import shutil
_______________________________________________
Commits mailing list
[email protected]
http://lists.gajim.org/cgi-bin/listinfo/commits