Mwalker has submitted this change and it was merged. Change subject: (FR #751) paypal nightly auditing ......................................................................
(FR #751) paypal nightly auditing Change-Id: I3ecb4aff2de708f4288312e7caaf4a4403fc8230 --- A audit/paypal/SarFile.py A audit/paypal/TrrFile.py A audit/paypal/__init__.py A audit/paypal/config.yaml.example A audit/paypal/download_nightly M audit/paypal/history.py A audit/paypal/parse_nightly A audit/paypal/ppreport.py A audit/paypal/unicode_csv_reader.py A queue/__init__.py A queue/stomp_wrap.py A sftp/__init__.py A sftp/client.py 13 files changed, 438 insertions(+), 37 deletions(-) Approvals: Mwalker: Looks good to me, approved jenkins-bot: Verified diff --git a/audit/paypal/SarFile.py b/audit/paypal/SarFile.py new file mode 100644 index 0000000..27c3b3a --- /dev/null +++ b/audit/paypal/SarFile.py @@ -0,0 +1,63 @@ +'''Parser for Paypal Subscription Agreement Report files + +See https://www.paypalobjects.com/webstatic/en_US/developer/docs/pdf/PP_LRD_SubscribeAgmntRprt.pdf +''' + +from process.logging import Logger as log +from queue.stomp_wrap import Stomp +import ppreport + +class SarFile(object): + VERSION=2 + stomp = None + + @staticmethod + def handle(path): + obj = SarFile(path) + obj.parse() + + def __init__(self, path): + self.path = path + + def parse(self): + ppreport.read(self.path, self.VERSION, self.parse_line) + + def parse_line(self, row): + names = row['Subscription Payer Name'].split(' ') + + out = { + 'subscr_id': row['Subscription ID'], + 'mc_currency': row['Subscription Currency'], + 'mc_amount3': float(row['Period 3 Amount']) / 100, + 'period3': row['Subscription Period 3'], + 'subscr_date': row['Subscription Creation Date'], + 'payer_email': row['Subscription Payer email address'], + 'first_name': names[0], + 'last_name': " ".join(names[1:]), + 'address_street': row['Shipping Address Line1'], + 'address_city': row['Shipping Address City'], + 'address_zip': row['Shipping Address Zip'], + 'address_state': row['Shipping Address State'], + 'address_country_code': row['Shipping Address Country'], + 'gateway': 'paypal', + } + + if row['Subscription Period 3'] != "1 M": + raise RuntimeError("Unknown subscription period {period}".format(period=row['Subscription Period 3'])) + + if row['Subscription Action Type'] == 'S0000': + out['txn_type'] = 'subscr_signup' + elif row['Subscription Action Type'] == 'S0100': + log.info("Ignoring subscription modification") + elif row['Subscription Action Type'] == 'S0200': + out['txn_type'] = 'subscr_cancel' + elif row['Subscription Action Type'] == 'S0300': + out['txn_type'] = 'subscr_eot' + + self.send(out) + + def send(self, msg): + if not self.stomp: + self.stomp = Stomp() + + self.stomp.send(msg, 'recurring') diff --git a/audit/paypal/TrrFile.py b/audit/paypal/TrrFile.py new file mode 100644 index 0000000..1352df6 --- /dev/null +++ b/audit/paypal/TrrFile.py @@ -0,0 +1,129 @@ +'''Parser for Transaction Detail Report files + +See https://www.paypalobjects.com/webstatic/en_US/developer/docs/pdf/PP_LRD_Gen_TransactionDetailReport.pdf +''' + +from process.logging import Logger as log +from queue.stomp_wrap import Stomp +import ppreport + +class TrrFile(object): + VERSION=4 + stomp = None + + @staticmethod + def handle(path): + obj = TrrFile(path) + obj.parse() + + def __init__(self, path): + self.path = path + + def parse(self): + ppreport.read(self.path, self.VERSION, self.parse_line) + + def parse_line(self, row): + if row['Billing Address Line1']: + addr_prefix = 'Billing Address ' + else: + addr_prefix = 'Shipping Address ' + + out = { + 'gateway_txn_id': row['Transaction ID'], + 'date': row['Transaction Initiation Date'], + 'settled_date': row['Transaction Completion Date'], + 'gross': float(row['Gross Transaction Amount']) / 100.0, + 'currency': row['Gross Transaction Currency'], + 'gateway_status': row['Transactional Status'], + 'gateway': 'paypal', + 'note': row['Transaction Note'], + 'email': row['Payer\'s Account ID'], + + 'street_address': row[addr_prefix + 'Line1'], + 'supplemental_address_1': row[addr_prefix + 'Line2'], + 'city': row[addr_prefix + 'City'], + 'state_province': row[addr_prefix + 'State'], + 'postal_code': row[addr_prefix + 'Zip'], + 'country': row[addr_prefix + 'Country'], + } + + if row['Fee Amount']: + out['fee'] = float(row['Fee Amount']) / 100.0 + + if row['Fee Currency'] and row['Gross Transaction Currency'] != row['Fee Currency']: + raise RuntimeError("Failed to import because multiple currencies for one transaction is not handled.") + + if 'Consumer Given Name' in row: + out['first_name'] = row['Consumer Given Name'] + + if 'Consumer Family Name' in row: + out['last_name'] = row['Consumer Family Name'] + + if 'Payment Source' in row: + out['payment_method'] = row['Payment Source'] + + if 'Card Type' in row: + out['payment_submethod'] = row['Card Type'] + + if row['PayPal Reference ID Type'] == 'SUB': + out['subscr_id'] = row['PayPal Reference ID'] + + event_type = row['Transaction Event Code'][0:3] + + queue = None + if event_type in ('T00', 'T03', 'T05', 'T07', 'T22'): + if row['Transaction Event Code'] == 'T0002': + queue = 'recurring' + out = self.normalize_recurring(out) + else: + queue = 'donations' + elif event_type in ('T11', 'T12'): + out['gateway_refund_id'] = out['gateway_txn_id'] + out['gross_currency'] = out['currency'] + + if row['PayPal Reference ID Type'] == 'TXN': + out['gateway_parent_id'] = row['PayPal Reference ID'] + + if row['Transaction Event Code'] == 'T1106': + out['type'] = 'reversal' + elif row['Transaction Event Code'] == 'T1107': + out['type'] = 'refund' + elif row['Transaction Event Code'] == 'T1201': + out['type'] = 'chargeback' + + queue = 'refund' + + if queue: + self.send(queue, out) + else: + log.debug("Ignoring event of class {type}".format(type=event_type)) + + def send(self, queue, msg): + if not self.stomp: + self.stomp = Stomp() + + self.stomp.send(msg, queue) + + def normalize_recurring(self, msg): + 'Synthesize a raw PayPal message' + + if 'fee' not in msg: + msg['fee'] = 0 + + out = { + 'gateway': 'paypal', + 'txn_type': 'subscr_payment', + 'txn_id': msg['gateway_txn_id'], + 'subscr_id': msg['subscr_id'], + 'payment_date': msg['date'], + 'payer_email': msg['email'], + 'mc_gross': msg['gross'], + 'mc_fee': msg['fee'], + 'address_street': "\n".join([msg['street_address'], msg['supplemental_address_1']]), + 'address_city': msg['city'], + 'address_zip': msg['postal_code'], + 'address_state': msg['state_province'], + 'address_country_code': msg['country'], + } + + return out diff --git a/audit/paypal/__init__.py b/audit/paypal/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/audit/paypal/__init__.py diff --git a/audit/paypal/config.yaml.example b/audit/paypal/config.yaml.example new file mode 100644 index 0000000..8746a5f --- /dev/null +++ b/audit/paypal/config.yaml.example @@ -0,0 +1,20 @@ +# Copy as /etc/fundraising/paypal-audit.yaml, or $HOME/.fundraising/paypal-audit.yaml + +sftp: + host: reports.paypal.com + username: fooness + password: "Wicked" + host_key: "AAAAB3NzaC1yc2EAAAABIwAAAIEAsWerPjkPXD/EklfIvMKoi+K+DYmNdIJNbuwzm36wWEfGUGdki8+no7PUN3v4iQ2QmYcpiZcxGaUXDs4L6Ko41jtsfVK+DGr8B3p4uWPqJoohIieMYAeKy5dEaAuaa8RVAgaEGa9wi4c1JZFYKgxEqJhJFmpD55VO0DvTmS9JKYM=" + +archive_path: /home/audit/archive +incoming_path: /home/audit/incoming + +stomp: + server: crm.dev + port: 61613 + queues: + refund: /queue/refund + donations: /queue/donations + recurring: /queue/recurring + +no_effect: 0 diff --git a/audit/paypal/download_nightly b/audit/paypal/download_nightly new file mode 100755 index 0000000..836b836 --- /dev/null +++ b/audit/paypal/download_nightly @@ -0,0 +1,51 @@ +#!/usr/bin/env python + +import os, os.path + +from process.logging import Logger as log + +from process.globals import load_config +load_config("paypal-audit") + +from process.globals import config +import process.lock as lock +import sftp.client + +remote_root = "ppreports/outgoing" + +def walk_files(paths): + result = [] + + if not hasattr(paths, 'extend'): + paths = [paths] + + for root in paths: + for dirpath, dirnames, filenames in os.walk(root): + result += filenames + + return result + +if __name__ == '__main__': + log.info("Begin PayPal nightly audit download") + lock.begin() + + local_paths = [ + config.incoming_path, + config.archive_path, + ] + local_files = walk_files(local_paths) + + remote = sftp.client.Client() + remote_files = remote.ls(remote_root) + + for filename in remote_files: + if filename in local_files: + log.info("Skipping already downloaded file {filename}".format(filename=filename)) + continue + + log.info("Downloading file {filename}".format(filename=filename)) + dest_path = os.path.join(config.incoming_path, filename) + remote.get(os.path.join(remote_root, filename), dest_path) + + lock.end() + log.info("End PayPal nightly audit download") diff --git a/audit/paypal/history.py b/audit/paypal/history.py index 061e72e..0c65372 100755 --- a/audit/paypal/history.py +++ b/audit/paypal/history.py @@ -2,7 +2,7 @@ from ConfigParser import SafeConfigParser from optparse import OptionParser -import stomp +from queue.stomp_wrap import Stomp import time import json import csv @@ -171,42 +171,6 @@ }) return msg - -class Stomp(object): - def __init__(self, config): - host_and_ports = [(config.get('Stomp', 'server'), config.getint('Stomp', 'port'))] - self.sc = stomp.Connection(host_and_ports) - self.sc.start() - self.sc.connect() - - def __del__(self): - if self.sc: - self.sc.disconnect() - - # Let the STOMP library catch up - import time - time.sleep(1) - - def send(self, msg, queue_name): - global options, config - - if options.noEffect: - log("not queueing message. " + json.dumps(msg)) - return - - headers = { - 'correlation-id': '%s-%s' % (msg['gateway'], msg['gateway_txn_id']), - 'destination': config.get('Stomp', '%s-queue' % (queue_name,)), - 'persistent': 'true', - } - - if config.getboolean('Stomp', 'debug'): - log("sending %s %s" % (headers, msg)) - - self.sc.send( - json.dumps(msg), - headers - ) log_file = None diff --git a/audit/paypal/parse_nightly b/audit/paypal/parse_nightly new file mode 100755 index 0000000..976359f --- /dev/null +++ b/audit/paypal/parse_nightly @@ -0,0 +1,55 @@ +#!/usr/bin/env python + +import os, os.path +import re +from process.logging import Logger as log +from process.globals import load_config +load_config("paypal-audit") +from process.globals import config +import process.lock as lock + +import TrrFile, SarFile + +filename_re = r'''^(?x) + (?P<type>[A-Z]{3})- + (?P<date>[0-9]{8})[.] + (?P<sequence>[0-9]{2})[.] + (?P<version>[0-9]{3})[.] + CSV + $ +''' + +handlers = { + 'TRR': TrrFile.TrrFile.handle, + 'SAR': SarFile.SarFile.handle, +} + +def parse(path): + matches = re.match(filename_re, filename) + if matches: + filetype = matches.group('type') + if filetype in handlers: + log.info("Parsing file using {type} handler".format(type=filetype)) + handlers[filetype](path) + else: + log.info("No handler for type {type}".format(type=filetype)) + else: + log.error("Could not parse filename: {filename}".format(filename=filename)) + +def archive(path): + filename = os.path.basename(path) + dest_path = os.path.join(config.archive_path, filename) + + log.info("Archiving {orig} to {new}".format(orig=path, new=dest_path)) + os.rename(path, dest_path) + +if __name__ == '__main__': + lock.begin() + + for filename in os.listdir(config.incoming_path): + path = os.path.join(config.incoming_path, filename) + if os.path.isfile(path): + parse(path) + archive(path) + + lock.end() diff --git a/audit/paypal/ppreport.py b/audit/paypal/ppreport.py new file mode 100644 index 0000000..28d47b2 --- /dev/null +++ b/audit/paypal/ppreport.py @@ -0,0 +1,34 @@ +import io + +from unicode_csv_reader import unicode_csv_reader + +dialect = dict( + delimiter=',', + quotechar='"' +) + +def read(path, version, callback): + with io.open(path, 'r', encoding='utf-16') as csvfile: + plainreader = unicode_csv_reader(csvfile, **dialect) + for row in plainreader: + if row[0] == 'RH': + if int(row[4]) != version: + raise RuntimeError("This file uses an unexpected format revision: {version}".format(version=row[4])) + elif row[0] == 'FH': + pass + elif row[0] == 'SH': + pass + elif row[0] == 'CH': + column_headers = ['Column Type'] + row[1:] + break + else: + raise RuntimeError("Unexpected row type: {type}".format(type=row[0])) + + for line in plainreader: + row = dict(zip(column_headers, line)) + if row['Column Type'] == 'SB': + callback(row) + elif row['Column Type'] in ('SF', 'SC', 'RF', 'RC', 'FF'): + pass + else: + raise RuntimeError("Section ended and crazy stuff began: {type}".format(type=row['Column Type'])) diff --git a/audit/paypal/unicode_csv_reader.py b/audit/paypal/unicode_csv_reader.py new file mode 100644 index 0000000..de1d964 --- /dev/null +++ b/audit/paypal/unicode_csv_reader.py @@ -0,0 +1,14 @@ +import csv + +# from http://docs.python.org/2/library/csv.html +def unicode_csv_reader(unicode_csv_data, dialect=csv.excel, **kwargs): + # csv.py doesn't do Unicode; encode temporarily as UTF-8: + csv_reader = csv.reader(utf_8_encoder(unicode_csv_data), + dialect=dialect, **kwargs) + for row in csv_reader: + # decode UTF-8 back to Unicode, cell by cell: + yield [unicode(cell, 'utf-8') for cell in row] + +def utf_8_encoder(unicode_csv_data): + for line in unicode_csv_data: + yield line.encode('utf-8') diff --git a/queue/__init__.py b/queue/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/queue/__init__.py diff --git a/queue/stomp_wrap.py b/queue/stomp_wrap.py new file mode 100644 index 0000000..5cf4f08 --- /dev/null +++ b/queue/stomp_wrap.py @@ -0,0 +1,38 @@ +from process.globals import config +from process.logging import Logger as log + +import json +from stompy import Stomp as DistStomp + +class Stomp(object): + conn = None + + def __init__(self): + self.conn = DistStomp(config.stomp.server, config.stomp.port) + self.conn.connect() + + def __del__(self): + if self.conn: + self.conn.disconnect() + + # Let the STOMP library catch up + import time + time.sleep(1) + + def send(self, msg, queue_key): + if config.no_effect: + log.info("not queueing message. " + json.dumps(msg)) + return + + meta = { + 'destination': config.stomp.queues[queue_key], + 'persistent': 'true', + } + + if 'gateway' in msg and 'gateway_txn_id' in msg: + meta['correlation-id'] = '{gw}-{id}'.format(gw=msg['gateway'], id=msg['gateway_txn_id']) + + #log.debug("sending %s %s" % (meta, msg)) + + meta.update({'body': json.dumps(msg)}) + self.conn.send(meta) diff --git a/sftp/__init__.py b/sftp/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/sftp/__init__.py diff --git a/sftp/client.py b/sftp/client.py new file mode 100644 index 0000000..71dce41 --- /dev/null +++ b/sftp/client.py @@ -0,0 +1,33 @@ +import os, os.path +import base64 +import paramiko + +from process.logging import Logger as log +from process.globals import config + +class Client(object): + def __init__(self): + self.host_public_key = paramiko.RSAKey(data=base64.decodestring(config.sftp.host_key)) + + self.connect() + + def __del__(self): + self.client.close() + + def connect(self): + log.info("Connecting to {host}".format(host=config.sftp.host)) + transport = paramiko.Transport((config.sftp.host, 22)) + transport.connect(username=config.sftp.username, password=config.sftp.password, hostkey=self.host_public_key) + self.client = paramiko.SFTPClient.from_transport(transport) + + def ls(self, path): + return self.client.listdir(path) + + def get(self, filename, dest_path): + try: + self.client.get(filename, dest_path) + except: + if os.path.exists(dest_path): + log.info("Removing corrupted download: {path}".format(path=dest_path)) + os.unlink(dest_path) + raise -- To view, visit https://gerrit.wikimedia.org/r/102041 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: merged Gerrit-Change-Id: I3ecb4aff2de708f4288312e7caaf4a4403fc8230 Gerrit-PatchSet: 5 Gerrit-Project: wikimedia/fundraising/tools Gerrit-Branch: master Gerrit-Owner: Adamw <awi...@wikimedia.org> Gerrit-Reviewer: Mwalker <mwal...@wikimedia.org> Gerrit-Reviewer: jenkins-bot _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits