Mwalker has submitted this change and it was merged.

Change subject: (FR #751) paypal nightly auditing
......................................................................


(FR #751) paypal nightly auditing

Change-Id: I3ecb4aff2de708f4288312e7caaf4a4403fc8230
---
A audit/paypal/SarFile.py
A audit/paypal/TrrFile.py
A audit/paypal/__init__.py
A audit/paypal/config.yaml.example
A audit/paypal/download_nightly
M audit/paypal/history.py
A audit/paypal/parse_nightly
A audit/paypal/ppreport.py
A audit/paypal/unicode_csv_reader.py
A queue/__init__.py
A queue/stomp_wrap.py
A sftp/__init__.py
A sftp/client.py
13 files changed, 438 insertions(+), 37 deletions(-)

Approvals:
  Mwalker: Looks good to me, approved
  jenkins-bot: Verified



diff --git a/audit/paypal/SarFile.py b/audit/paypal/SarFile.py
new file mode 100644
index 0000000..27c3b3a
--- /dev/null
+++ b/audit/paypal/SarFile.py
@@ -0,0 +1,63 @@
+'''Parser for Paypal Subscription Agreement Report files
+
+See 
https://www.paypalobjects.com/webstatic/en_US/developer/docs/pdf/PP_LRD_SubscribeAgmntRprt.pdf
+'''
+
+from process.logging import Logger as log
+from queue.stomp_wrap import Stomp
+import ppreport
+
+class SarFile(object):
+    VERSION=2
+    stomp = None
+
+    @staticmethod
+    def handle(path):
+        obj = SarFile(path)
+        obj.parse()
+
+    def __init__(self, path):
+        self.path = path
+
+    def parse(self):
+        ppreport.read(self.path, self.VERSION, self.parse_line)
+
+    def parse_line(self, row):
+        names = row['Subscription Payer Name'].split(' ')
+
+        out = {
+            'subscr_id': row['Subscription ID'],
+            'mc_currency': row['Subscription Currency'],
+            'mc_amount3': float(row['Period 3 Amount']) / 100,
+            'period3': row['Subscription Period 3'],
+            'subscr_date': row['Subscription Creation Date'],
+            'payer_email': row['Subscription Payer email address'],
+            'first_name': names[0],
+            'last_name': " ".join(names[1:]),
+            'address_street': row['Shipping Address Line1'],
+            'address_city': row['Shipping Address City'],
+            'address_zip': row['Shipping Address Zip'],
+            'address_state': row['Shipping Address State'],
+            'address_country_code': row['Shipping Address Country'],
+            'gateway': 'paypal',
+        }
+
+        if row['Subscription Period 3'] != "1 M":
+            raise RuntimeError("Unknown subscription period 
{period}".format(period=row['Subscription Period 3']))
+
+        if row['Subscription Action Type'] == 'S0000':
+            out['txn_type'] = 'subscr_signup'
+        elif row['Subscription Action Type'] == 'S0100':
+            log.info("Ignoring subscription modification")
+        elif row['Subscription Action Type'] == 'S0200':
+            out['txn_type'] = 'subscr_cancel'
+        elif row['Subscription Action Type'] == 'S0300':
+            out['txn_type'] = 'subscr_eot'
+
+        self.send(out)
+
+    def send(self, msg):
+        if not self.stomp:
+            self.stomp = Stomp()
+
+        self.stomp.send(msg, 'recurring')
diff --git a/audit/paypal/TrrFile.py b/audit/paypal/TrrFile.py
new file mode 100644
index 0000000..1352df6
--- /dev/null
+++ b/audit/paypal/TrrFile.py
@@ -0,0 +1,129 @@
+'''Parser for Transaction Detail Report files
+
+See 
https://www.paypalobjects.com/webstatic/en_US/developer/docs/pdf/PP_LRD_Gen_TransactionDetailReport.pdf
+'''
+
+from process.logging import Logger as log
+from queue.stomp_wrap import Stomp
+import ppreport
+
+class TrrFile(object):
+    VERSION=4
+    stomp = None
+
+    @staticmethod
+    def handle(path):
+        obj = TrrFile(path)
+        obj.parse()
+
+    def __init__(self, path):
+        self.path = path
+
+    def parse(self):
+        ppreport.read(self.path, self.VERSION, self.parse_line)
+
+    def parse_line(self, row):
+        if row['Billing Address Line1']:
+            addr_prefix = 'Billing Address '
+        else:
+            addr_prefix = 'Shipping Address '
+
+        out = {
+            'gateway_txn_id': row['Transaction ID'],
+            'date': row['Transaction Initiation Date'],
+            'settled_date': row['Transaction Completion Date'],
+            'gross': float(row['Gross Transaction Amount']) / 100.0,
+            'currency': row['Gross Transaction Currency'],
+            'gateway_status': row['Transactional Status'],
+            'gateway': 'paypal',
+            'note': row['Transaction Note'],
+            'email': row['Payer\'s Account ID'],
+
+            'street_address': row[addr_prefix + 'Line1'],
+            'supplemental_address_1': row[addr_prefix + 'Line2'],
+            'city': row[addr_prefix + 'City'],
+            'state_province': row[addr_prefix + 'State'],
+            'postal_code': row[addr_prefix + 'Zip'],
+            'country': row[addr_prefix + 'Country'],
+        }
+
+        if row['Fee Amount']:
+            out['fee'] = float(row['Fee Amount']) / 100.0
+
+            if row['Fee Currency'] and row['Gross Transaction Currency'] != 
row['Fee Currency']:
+                raise RuntimeError("Failed to import because multiple 
currencies for one transaction is not handled.")
+
+        if 'Consumer Given Name' in row:
+            out['first_name'] = row['Consumer Given Name']
+
+        if 'Consumer Family Name' in row:
+            out['last_name'] = row['Consumer Family Name']
+
+        if 'Payment Source' in row:
+            out['payment_method'] = row['Payment Source']
+
+        if 'Card Type' in row:
+            out['payment_submethod'] = row['Card Type']
+
+        if row['PayPal Reference ID Type'] == 'SUB':
+            out['subscr_id'] = row['PayPal Reference ID']
+
+        event_type = row['Transaction Event Code'][0:3]
+
+        queue = None
+        if event_type in ('T00', 'T03', 'T05', 'T07', 'T22'):
+            if row['Transaction Event Code'] == 'T0002':
+                queue = 'recurring'
+                out = self.normalize_recurring(out)
+            else:
+                queue = 'donations'
+        elif event_type in ('T11', 'T12'):
+            out['gateway_refund_id'] = out['gateway_txn_id']
+            out['gross_currency'] = out['currency']
+
+            if row['PayPal Reference ID Type'] == 'TXN':
+                out['gateway_parent_id'] = row['PayPal Reference ID']
+
+            if row['Transaction Event Code'] == 'T1106':
+                out['type'] = 'reversal'
+            elif row['Transaction Event Code'] == 'T1107':
+                out['type'] = 'refund'
+            elif row['Transaction Event Code'] == 'T1201':
+                out['type'] = 'chargeback'
+
+            queue = 'refund'
+
+        if queue:
+            self.send(queue, out)
+        else:
+            log.debug("Ignoring event of class {type}".format(type=event_type))
+
+    def send(self, queue, msg):
+        if not self.stomp:
+            self.stomp = Stomp()
+
+        self.stomp.send(msg, queue)
+
+    def normalize_recurring(self, msg):
+        'Synthesize a raw PayPal message'
+
+        if 'fee' not in msg:
+            msg['fee'] = 0
+
+        out = {
+            'gateway': 'paypal',
+            'txn_type': 'subscr_payment',
+            'txn_id': msg['gateway_txn_id'],
+            'subscr_id': msg['subscr_id'],
+            'payment_date': msg['date'],
+            'payer_email': msg['email'],
+            'mc_gross': msg['gross'],
+            'mc_fee': msg['fee'],
+            'address_street': "\n".join([msg['street_address'], 
msg['supplemental_address_1']]),
+            'address_city': msg['city'],
+            'address_zip': msg['postal_code'],
+            'address_state': msg['state_province'],
+            'address_country_code': msg['country'],
+        }
+
+        return out
diff --git a/audit/paypal/__init__.py b/audit/paypal/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/audit/paypal/__init__.py
diff --git a/audit/paypal/config.yaml.example b/audit/paypal/config.yaml.example
new file mode 100644
index 0000000..8746a5f
--- /dev/null
+++ b/audit/paypal/config.yaml.example
@@ -0,0 +1,20 @@
+# Copy as /etc/fundraising/paypal-audit.yaml, or 
$HOME/.fundraising/paypal-audit.yaml
+
+sftp:
+    host: reports.paypal.com
+    username: fooness
+    password: "Wicked"
+    host_key: 
"AAAAB3NzaC1yc2EAAAABIwAAAIEAsWerPjkPXD/EklfIvMKoi+K+DYmNdIJNbuwzm36wWEfGUGdki8+no7PUN3v4iQ2QmYcpiZcxGaUXDs4L6Ko41jtsfVK+DGr8B3p4uWPqJoohIieMYAeKy5dEaAuaa8RVAgaEGa9wi4c1JZFYKgxEqJhJFmpD55VO0DvTmS9JKYM="
+
+archive_path: /home/audit/archive
+incoming_path: /home/audit/incoming
+
+stomp:
+    server: crm.dev
+    port: 61613
+    queues:
+        refund: /queue/refund
+        donations: /queue/donations
+        recurring: /queue/recurring
+
+no_effect: 0
diff --git a/audit/paypal/download_nightly b/audit/paypal/download_nightly
new file mode 100755
index 0000000..836b836
--- /dev/null
+++ b/audit/paypal/download_nightly
@@ -0,0 +1,51 @@
+#!/usr/bin/env python
+
+import os, os.path
+
+from process.logging import Logger as log
+
+from process.globals import load_config
+load_config("paypal-audit")
+
+from process.globals import config
+import process.lock as lock
+import sftp.client
+
+remote_root = "ppreports/outgoing"
+
+def walk_files(paths):
+    result = []
+
+    if not hasattr(paths, 'extend'):
+        paths = [paths]
+
+    for root in paths:
+        for dirpath, dirnames, filenames in os.walk(root):
+            result += filenames
+
+    return result
+
+if __name__ == '__main__':
+    log.info("Begin PayPal nightly audit download")
+    lock.begin()
+
+    local_paths = [
+        config.incoming_path,
+        config.archive_path,
+    ]
+    local_files = walk_files(local_paths)
+
+    remote = sftp.client.Client()
+    remote_files = remote.ls(remote_root)
+
+    for filename in remote_files:
+        if filename in local_files:
+            log.info("Skipping already downloaded file 
{filename}".format(filename=filename))
+            continue
+
+        log.info("Downloading file {filename}".format(filename=filename))
+        dest_path = os.path.join(config.incoming_path, filename)
+        remote.get(os.path.join(remote_root, filename), dest_path)
+
+    lock.end()
+    log.info("End PayPal nightly audit download")
diff --git a/audit/paypal/history.py b/audit/paypal/history.py
index 061e72e..0c65372 100755
--- a/audit/paypal/history.py
+++ b/audit/paypal/history.py
@@ -2,7 +2,7 @@
 
 from ConfigParser import SafeConfigParser
 from optparse import OptionParser
-import stomp
+from queue.stomp_wrap import Stomp
 import time
 import json
 import csv
@@ -171,42 +171,6 @@
     })
 
     return msg
-
-class Stomp(object):
-    def __init__(self, config):
-        host_and_ports = [(config.get('Stomp', 'server'), 
config.getint('Stomp', 'port'))]
-        self.sc = stomp.Connection(host_and_ports)
-        self.sc.start()
-        self.sc.connect()
-
-    def __del__(self):
-        if self.sc:
-            self.sc.disconnect()
-
-            # Let the STOMP library catch up
-            import time
-            time.sleep(1)
-
-    def send(self, msg, queue_name):
-        global options, config
-
-        if options.noEffect:
-            log("not queueing message. " + json.dumps(msg))
-            return
-
-        headers = {
-            'correlation-id': '%s-%s' % (msg['gateway'], 
msg['gateway_txn_id']),
-            'destination': config.get('Stomp', '%s-queue' % (queue_name,)),
-            'persistent': 'true',
-        }
-
-        if config.getboolean('Stomp', 'debug'):
-            log("sending %s %s" % (headers, msg))
-
-        self.sc.send(
-            json.dumps(msg),
-            headers
-        )
 
 log_file = None
 
diff --git a/audit/paypal/parse_nightly b/audit/paypal/parse_nightly
new file mode 100755
index 0000000..976359f
--- /dev/null
+++ b/audit/paypal/parse_nightly
@@ -0,0 +1,55 @@
+#!/usr/bin/env python
+
+import os, os.path
+import re
+from process.logging import Logger as log
+from process.globals import load_config
+load_config("paypal-audit")
+from process.globals import config
+import process.lock as lock
+
+import TrrFile, SarFile
+
+filename_re = r'''^(?x)
+    (?P<type>[A-Z]{3})-
+    (?P<date>[0-9]{8})[.]
+    (?P<sequence>[0-9]{2})[.]
+    (?P<version>[0-9]{3})[.]
+    CSV
+    $
+'''
+
+handlers = {
+    'TRR': TrrFile.TrrFile.handle,
+    'SAR': SarFile.SarFile.handle,
+}
+
+def parse(path):
+    matches = re.match(filename_re, filename)
+    if matches:
+        filetype = matches.group('type')
+        if filetype in handlers:
+            log.info("Parsing file using {type} handler".format(type=filetype))
+            handlers[filetype](path)
+        else:
+            log.info("No handler for type {type}".format(type=filetype))
+    else:
+        log.error("Could not parse filename: 
{filename}".format(filename=filename))
+
+def archive(path):
+    filename = os.path.basename(path)
+    dest_path = os.path.join(config.archive_path, filename)
+
+    log.info("Archiving {orig} to {new}".format(orig=path, new=dest_path))
+    os.rename(path, dest_path)
+
+if __name__ == '__main__':
+    lock.begin()
+
+    for filename in os.listdir(config.incoming_path):
+        path = os.path.join(config.incoming_path, filename)
+        if os.path.isfile(path):
+            parse(path)
+            archive(path)
+
+    lock.end()
diff --git a/audit/paypal/ppreport.py b/audit/paypal/ppreport.py
new file mode 100644
index 0000000..28d47b2
--- /dev/null
+++ b/audit/paypal/ppreport.py
@@ -0,0 +1,34 @@
+import io
+
+from unicode_csv_reader import unicode_csv_reader
+
+dialect = dict(
+    delimiter=',',
+    quotechar='"'
+)
+
+def read(path, version, callback):
+    with io.open(path, 'r', encoding='utf-16') as csvfile:
+        plainreader = unicode_csv_reader(csvfile, **dialect)
+        for row in plainreader:
+            if row[0] == 'RH':
+                if int(row[4]) != version:
+                    raise RuntimeError("This file uses an unexpected format 
revision: {version}".format(version=row[4]))
+            elif row[0] == 'FH':
+                pass
+            elif row[0] == 'SH':
+                pass
+            elif row[0] == 'CH':
+                column_headers = ['Column Type'] + row[1:]
+                break
+            else:
+                raise RuntimeError("Unexpected row type: 
{type}".format(type=row[0]))
+
+        for line in plainreader:
+            row = dict(zip(column_headers, line))
+            if row['Column Type'] == 'SB':
+                callback(row)
+            elif row['Column Type'] in ('SF', 'SC', 'RF', 'RC', 'FF'):
+                pass
+            else:
+                raise RuntimeError("Section ended and crazy stuff began: 
{type}".format(type=row['Column Type']))
diff --git a/audit/paypal/unicode_csv_reader.py 
b/audit/paypal/unicode_csv_reader.py
new file mode 100644
index 0000000..de1d964
--- /dev/null
+++ b/audit/paypal/unicode_csv_reader.py
@@ -0,0 +1,14 @@
+import csv
+
+# from http://docs.python.org/2/library/csv.html
+def unicode_csv_reader(unicode_csv_data, dialect=csv.excel, **kwargs):
+    # csv.py doesn't do Unicode; encode temporarily as UTF-8:
+    csv_reader = csv.reader(utf_8_encoder(unicode_csv_data),
+                            dialect=dialect, **kwargs)
+    for row in csv_reader:
+        # decode UTF-8 back to Unicode, cell by cell:
+        yield [unicode(cell, 'utf-8') for cell in row]
+
+def utf_8_encoder(unicode_csv_data):
+    for line in unicode_csv_data:
+        yield line.encode('utf-8')
diff --git a/queue/__init__.py b/queue/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/queue/__init__.py
diff --git a/queue/stomp_wrap.py b/queue/stomp_wrap.py
new file mode 100644
index 0000000..5cf4f08
--- /dev/null
+++ b/queue/stomp_wrap.py
@@ -0,0 +1,38 @@
+from process.globals import config
+from process.logging import Logger as log
+
+import json
+from stompy import Stomp as DistStomp
+
+class Stomp(object):
+    conn = None
+
+    def __init__(self):
+        self.conn = DistStomp(config.stomp.server, config.stomp.port)
+        self.conn.connect()
+
+    def __del__(self):
+        if self.conn:
+            self.conn.disconnect()
+
+            # Let the STOMP library catch up
+            import time
+            time.sleep(1)
+
+    def send(self, msg, queue_key):
+        if config.no_effect:
+            log.info("not queueing message. " + json.dumps(msg))
+            return
+
+        meta = {
+            'destination': config.stomp.queues[queue_key],
+            'persistent': 'true',
+        }
+
+        if 'gateway' in msg and 'gateway_txn_id' in msg:
+            meta['correlation-id'] = '{gw}-{id}'.format(gw=msg['gateway'], 
id=msg['gateway_txn_id'])
+
+        #log.debug("sending %s %s" % (meta, msg))
+
+        meta.update({'body': json.dumps(msg)})
+        self.conn.send(meta)
diff --git a/sftp/__init__.py b/sftp/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/sftp/__init__.py
diff --git a/sftp/client.py b/sftp/client.py
new file mode 100644
index 0000000..71dce41
--- /dev/null
+++ b/sftp/client.py
@@ -0,0 +1,33 @@
+import os, os.path
+import base64
+import paramiko
+
+from process.logging import Logger as log
+from process.globals import config
+
+class Client(object):
+    def __init__(self):
+        self.host_public_key = 
paramiko.RSAKey(data=base64.decodestring(config.sftp.host_key))
+
+        self.connect()
+
+    def __del__(self):
+        self.client.close()
+
+    def connect(self):
+        log.info("Connecting to {host}".format(host=config.sftp.host))
+        transport = paramiko.Transport((config.sftp.host, 22))
+        transport.connect(username=config.sftp.username, 
password=config.sftp.password, hostkey=self.host_public_key)
+        self.client = paramiko.SFTPClient.from_transport(transport)
+
+    def ls(self, path):
+        return self.client.listdir(path)
+
+    def get(self, filename, dest_path):
+        try:
+            self.client.get(filename, dest_path)
+        except:
+            if os.path.exists(dest_path):
+                log.info("Removing corrupted download: 
{path}".format(path=dest_path))
+                os.unlink(dest_path)
+            raise

-- 
To view, visit https://gerrit.wikimedia.org/r/102041
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I3ecb4aff2de708f4288312e7caaf4a4403fc8230
Gerrit-PatchSet: 5
Gerrit-Project: wikimedia/fundraising/tools
Gerrit-Branch: master
Gerrit-Owner: Adamw <awi...@wikimedia.org>
Gerrit-Reviewer: Mwalker <mwal...@wikimedia.org>
Gerrit-Reviewer: jenkins-bot

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to