Ejegg has uploaded a new change for review. https://gerrit.wikimedia.org/r/307679
Change subject: Move antifraud queue off ActiveMQ ...................................................................... Move antifraud queue off ActiveMQ Requirements: * Mirror antifraud queue to redis (in both DI and SmashPig) Bug: T131273 Change-Id: Ida1a73bc8a2484a6ecf99bc8d70a0e5d69b1233a --- A sites/all/modules/queue2civicrm/fredge/AntifraudQueueConsumer.php D sites/all/modules/queue2civicrm/fredge/test/data/payments-antifraud.json D sites/all/modules/queue2civicrm/fredge/test/data/payments-init.json M sites/all/modules/queue2civicrm/fredge/wmf_fredge_qc.info M sites/all/modules/queue2civicrm/fredge/wmf_fredge_qc.module D sites/all/modules/queue2civicrm/tests/data/pending_amazon.json D sites/all/modules/queue2civicrm/tests/data/pending_astropay.json D sites/all/modules/queue2civicrm/tests/data/sparse_donation_amazon.json D sites/all/modules/queue2civicrm/tests/data/sparse_donation_astropay.json 9 files changed, 134 insertions(+), 235 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/wikimedia/fundraising/crm refs/changes/79/307679/1 diff --git a/sites/all/modules/queue2civicrm/fredge/AntifraudQueueConsumer.php b/sites/all/modules/queue2civicrm/fredge/AntifraudQueueConsumer.php new file mode 100644 index 0000000..0e5dfb0 --- /dev/null +++ b/sites/all/modules/queue2civicrm/fredge/AntifraudQueueConsumer.php @@ -0,0 +1,114 @@ +<?php namespace queue2civicrm\fredge; + +use FredgeDataValidationException; +use wmf_common\WmfQueueConsumer; +use WmfException; + +class AntifraudQueueConsumer extends WmfQueueConsumer { + + /** + * Validate and store messages from the payments-antifraud queue + * + * @param array $message + * @throws WmfException + */ + function processMessage( $message ) { + $id = "{$message['gateway']}-{$message['order_id']}"; + watchdog( + 'fredge', + "Beginning processing of payments-antifraud message for $id: " . + json_encode( $message ), + array(), + WATCHDOG_INFO + ); + + // handle the IP address conversion to binary so we can do database voodoo later. + if ( array_key_exists( 'user_ip', $message ) ) { + // check for IPv6 + if ( strpos( ':', $message['user_ip'] ) !== false ) { + /** + * despite a load of documentation to the contrary, the following line + * ***doesn't work at all***. + * Which is okay for now: We force IPv4 on payments. + * @TODO eventually: Actually handle IPv6 here. + */ + // $message['user_ip'] = inet_pton($message['user_ip']); + + watchdog( + 'fredge', + 'Weird. Somehow an ipv6 address got through on payments. ' . + "Caught in antifraud consumer. $id", + array(), + WATCHDOG_WARNING + ); + $message['user_ip'] = 0; + } else { + $message['user_ip'] = ip2long( $message['user_ip'] ); + } + } + + $this->insertAntifraudData( $message, $id ); + } + + /** + * take a message and insert or update rows in payments_fraud and payments_fraud_breakdown. + * If there is not yet an antifraud row for this ct_id and order_id, all fields + * in the table must be present in the message. + * @param array $msg the message that you want to upsert. + * @param string $logIdentifier Some small string for the log that will help id + * the message if something goes amiss and we have to log about it. + * @throws FredgeDataValidationException + */ + protected function insertAntifraudData( $msg, $logIdentifier ) { + + if ( empty( $msg ) || empty( $msg['contribution_tracking_id'] ) || empty( $msg['order_id'] ) ) { + $error = "$logIdentifier: missing essential payments_fraud IDs. Dropping message on floor."; + throw new FredgeDataValidationException( $error ); + } + + $id = 0; + $inserting = true; + + $dbs = wmf_civicrm_get_dbs(); + $dbs->push( 'fredge' ); + $query = 'SELECT id FROM payments_fraud WHERE contribution_tracking_id = :ct_id AND order_id = :order_id LIMIT 1'; + $result = db_query( $query, array( + ':ct_id' => $msg['contribution_tracking_id'], + ':order_id' => $msg['order_id'] + ) ); + if ( $result->rowCount() === 1 ) { + $id = $result->fetch()->id; + $inserting = false; + } + $data = fredge_prep_data( $msg, 'payments_fraud', $logIdentifier, $inserting ); + //now all you have to do is insert the actual message data. + if ( $inserting ) { + $id = db_insert( 'payments_fraud' ) + ->fields( $data ) + ->execute(); + } else { + db_update( 'payments_fraud' ) + ->fields( $data ) + ->condition( 'id', $id ) + ->execute(); + } + if ( $id ) { + foreach ( $msg['score_breakdown'] as $test => $score ) { + $breakdown = array( + 'payments_fraud_id' => $id, + 'filter_name' => $test, + 'risk_score' => $score, + ); + // validate the data. none of these fields would be converted, so no need + // to store the output + fredge_prep_data( $breakdown, 'payments_fraud_breakdown', $logIdentifier, true ); + db_merge( 'payments_fraud_breakdown' )->key( array( + 'payments_fraud_id' => $id, + 'filter_name' => $test, + ) )->fields( array( + 'risk_score' => $score, + ) )->execute(); + } + } + } +} diff --git a/sites/all/modules/queue2civicrm/fredge/test/data/payments-antifraud.json b/sites/all/modules/queue2civicrm/fredge/test/data/payments-antifraud.json deleted file mode 100644 index 5472480..0000000 --- a/sites/all/modules/queue2civicrm/fredge/test/data/payments-antifraud.json +++ /dev/null @@ -1,25 +0,0 @@ -{ - "validation_action": "process", - "risk_score": "0.3", - "score_breakdown": { - "initial": "0", - "getCVVResult": "0.2", - "getAVSResult": "0", - "getScoreCountryMap": "0", - "getScoreUtmCampaignMap": "0", - "getScoreEmailDomainMap": "0", - "minfraud_filter": "0.10", - "IPVelocityFilter": "0" - }, - "php-message-class": "SmashPig\\CrmLink\\Messages\\DonationInterfaceAntifraud", - "user_ip": "1.2.3.4", - "freeform": "1", - "gateway_txn_id": "", - "correlation-id": "testgateway-28713751.0", - "date": "1445990975", - "server": "testpayments1001", - "gateway": "testgateway", - "contribution_tracking_id": "28713751", - "order_id": "28713751.0", - "payment_method": "cc" -} diff --git a/sites/all/modules/queue2civicrm/fredge/test/data/payments-init.json b/sites/all/modules/queue2civicrm/fredge/test/data/payments-init.json deleted file mode 100644 index d586a02..0000000 --- a/sites/all/modules/queue2civicrm/fredge/test/data/payments-init.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "php-message-class": "SmashPig\\CrmLink\\Messages\\DonationInterfaceFinalStatus", - "contribution_tracking_id": "12345", - "gateway": "testgateway", - "order_id": "", - "gateway_txn_id": "", - "validation_action": "process", - "payments_final_status": "complete", - "payment_method": "paypal", - "payment_submethod": "", - "country": "IT", - "amount": "2.00", - "currency_code": "EUR", - "server": "testpayments1002", - "date": "1445990999" -} diff --git a/sites/all/modules/queue2civicrm/fredge/wmf_fredge_qc.info b/sites/all/modules/queue2civicrm/fredge/wmf_fredge_qc.info index 7437b44..9765838 100644 --- a/sites/all/modules/queue2civicrm/fredge/wmf_fredge_qc.info +++ b/sites/all/modules/queue2civicrm/fredge/wmf_fredge_qc.info @@ -4,3 +4,4 @@ package = queue2civicrm configure = admin/config/queue2civicrm/fredge_qc dependencies[] = queue2civicrm +files[] = AntifraudQueueConsumer.php diff --git a/sites/all/modules/queue2civicrm/fredge/wmf_fredge_qc.module b/sites/all/modules/queue2civicrm/fredge/wmf_fredge_qc.module index 0fb56f7..3e74503 100644 --- a/sites/all/modules/queue2civicrm/fredge/wmf_fredge_qc.module +++ b/sites/all/modules/queue2civicrm/fredge/wmf_fredge_qc.module @@ -1,4 +1,5 @@ <?php +use queue2civicrm\fredge\AntifraudQueueConsumer; /** * Implements hook_menu @@ -39,10 +40,10 @@ $form['fredge_payments_antifraud_queue'] = array( '#type' => 'textfield', - '#title' => t('Payments-antifraud subscription path'), + '#title' => t('Payments-antifraud queue name'), '#required' => TRUE, - '#default_value' => variable_get('fredge_payments_antifraud_queue', '/queue/payments-antifraud'), - '#description' => t('Queue for payments-antifraud items'), + '#default_value' => variable_get('fredge_payments_antifraud_queue', 'payments-antifraud'), + '#description' => t('Config key under data-store for payments-antifraud queue'), ); // payments-antifraud @@ -67,27 +68,23 @@ civicrm_initialize(); - $dequeue_params = array( - 'init' => array( - 'queue' => variable_get('fredge_payments_init_queue', '/queue/payments-init'), - 'callback' => 'fredge_payments_init_process_message' - ), - 'fraud' => array( - 'queue' => variable_get('fredge_payments_antifraud_queue', '/queue/payments-antifraud'), - 'callback' => 'fredge_payments_antifraud_process_message' - ), + //Let's start with the simplest possible division of labor + $cycle_time = variable_get('fredge_batch_time', 0) / 2; + + $processed = queue2civicrm_stomp()->dequeue_loop( + variable_get('fredge_payments_init_queue', '/queue/payments-init'), + false, + $cycle_time, + 'fredge_payments_init_process_message' ); - //Let's start with the simplest possible division of labor - $cycle_time = variable_get('fredge_batch_time', 0) / count($dequeue_params); + $fraudQueueConsumer = new AntifraudQueueConsumer( + variable_get('fredge_payments_antifraud_queue', 'payments-antifraud'), + false, + $cycle_time + ); - $processed = 0; - foreach ($dequeue_params as $type => $params) { - $processed += queue2civicrm_stomp()->dequeue_loop( - $params['queue'], false, //not using the queue count limiter anymore - $cycle_time, $params['callback'] - ); - } + $processed += $fraudQueueConsumer->dequeueMessages(); if ($processed > 0) { watchdog('fredge', 'Successfully processed ' . $processed . ' fredge message(s).'); @@ -139,100 +136,6 @@ } /** - * Processes an individual payments-antifraud message. - * - * @param $msg A STOMP message class. - * - */ -function fredge_payments_antifraud_process_message($msg) { - $txnid = $msg->headers['correlation-id']; - watchdog('fredge', "Beginning processing of payments-antifraud message for $txnid: " . json_encode($msg), array(), WATCHDOG_INFO); - - $body = json_decode($msg->body, TRUE); - - //handle the IP address conversion to binary so we can do database voodoo later. - if (array_key_exists('user_ip', $body)) { - //check for IPv6 - if ( strpos(':', $body['user_ip']) !== false ){ - /** despite a load of documentation to the contrary, the following line - * ***doesn't work at all***. - * Which is okay for now: We force IPv4 on payments. - * @TODO eventually: Actually handle IPv6 here. - */ - -// $body['user_ip'] = inet_pton($body['user_ip']); - - watchdog('fredge', "Weird. Somehow an ipv6 address got through on payments. Caught in antifraud consumer. $txnid"); - $body['user_ip'] = 0; - } else { - $body['user_ip'] = ip2long($body['user_ip']); - } - } - - fredge_insert_antifraud_data($body, $txnid); -} - -/** - * take a message and insert or update rows in payments_fraud and payments_fraud_breakdown. - * If there is not yet an antifraud row for this ct_id and order_id, all fields - * in the table must be present in the message. - * @param array $msg the array-converted message body that you want to upsert. - * @param string $log_identifier Some small string for the log that will help id - * the message if something goes amiss and we have to log about it. - */ -function fredge_insert_antifraud_data($msg, $log_identifier) { - - if ( empty($msg) || empty($msg['contribution_tracking_id']) || empty($msg['order_id']) ){ - $error = "$log_identifier: missing essential payments_fraud IDs. Dropping message on floor."; - throw new FredgeDataValidationException( $error ); - } - - $id = 0; - $inserting = true; - - $dbs = wmf_civicrm_get_dbs(); - $dbs->push('fredge'); - $query = 'SELECT id FROM payments_fraud WHERE contribution_tracking_id = :ct_id AND order_id = :order_id LIMIT 1'; - $result = db_query( $query, array( - ':ct_id' => $msg['contribution_tracking_id'], - ':order_id' => $msg['order_id'] - ) ); - if ( $result->rowCount() === 1 ){ - $id = $result->fetch()->id; - $inserting = false; - } - $data = fredge_prep_data( $msg, 'payments_fraud', $log_identifier, $inserting ); - //now all you have to do is insert the actual message data. - if ( $inserting ) { - $id = db_insert( 'payments_fraud' ) - ->fields($data) - ->execute(); - } else { - db_update( 'payments_fraud' ) - ->fields($data) - ->condition('id', $id) - ->execute(); - } - if ($id) { - foreach ($msg['score_breakdown'] as $test => $score) { - $breakdown = array( - 'payments_fraud_id' => $id, - 'filter_name' => $test, - 'risk_score' => $score, - ); - // validate the data. none of these fields would be converted, so no need - // to store the output - fredge_prep_data( $breakdown, 'payments_fraud_breakdown', $log_identifier, true ); - db_merge( 'payments_fraud_breakdown' )->key( array( - 'payments_fraud_id' => $id, - 'filter_name' => $test, - ) )->fields( array ( - 'risk_score' => $score, - ) )->execute(); - } - } -} -/** * Check a message against a table schema, and complain if it doesn't fit. Also * format date fields and discard fields with no matching column. * @param array $msg the array-converted message body @@ -256,7 +159,7 @@ //and then, because we only care about the stuff we're inserting, remove the 'type' => 'serial' fields, because autonumber foreach ($schemata as $schema_table => $schema) { foreach ($schema['fields'] as $field => $definition) { - if ($definition['type'] === 'serial') { + if (isset($definition['type']) && $definition['type'] === 'serial') { unset($schemata[$schema_table]['fields'][$field]); } } diff --git a/sites/all/modules/queue2civicrm/tests/data/pending_amazon.json b/sites/all/modules/queue2civicrm/tests/data/pending_amazon.json deleted file mode 100644 index e879a15..0000000 --- a/sites/all/modules/queue2civicrm/tests/data/pending_amazon.json +++ /dev/null @@ -1,27 +0,0 @@ -{ - "city": "cc", - "contribution_tracking_id": "9876512345", - "country": "IL", - "currency": "PLN", - "date": 1234567, - "email": "test+...@local.net", - "fee": "0", - "first_name": "firrst", - "gateway_account": "default", - "gateway": "amazon", - "gross": "10.00", - "language": "en", - "last_name": "laast", - "order_id": "9876512345-1", - "payment_method": "amazon", - "payment_submethod": "amazon", - "postal_code": "11122", - "response": "Original Response Status (pre-SET_PAYMENT): 600", - "state_province": "Haifa", - "street_address": "ss", - "supplemental_address_1": "", - "user_ip": "127.0.0.2", - "utm_campaign": "C13_blah_blah", - "utm_medium": "sidebar", - "utm_source": "..cc" -} diff --git a/sites/all/modules/queue2civicrm/tests/data/pending_astropay.json b/sites/all/modules/queue2civicrm/tests/data/pending_astropay.json deleted file mode 100644 index 3025619..0000000 --- a/sites/all/modules/queue2civicrm/tests/data/pending_astropay.json +++ /dev/null @@ -1,27 +0,0 @@ -{ - "city": "", - "contribution_tracking_id": "11451123", - "country": "BR", - "currency": "BRL", - "date": 1234567, - "email": "test+...@local.net", - "fee": "0", - "first_name": "firrst", - "gateway_account": "default", - "gateway": "astropay", - "gross": "20.00", - "language": "pt", - "last_name": "laast", - "order_id": "11451123.1", - "payment_method": "cc", - "payment_submethod": "mc", - "postal_code": "11122", - "response": "7", - "state_province": "", - "street_address": "N0ne provided", - "supplemental_address_1": "", - "user_ip": "127.0.0.2", - "utm_campaign": "TestyTestCampaign", - "utm_medium": "sidebar", - "utm_source": "..cc" -} diff --git a/sites/all/modules/queue2civicrm/tests/data/sparse_donation_amazon.json b/sites/all/modules/queue2civicrm/tests/data/sparse_donation_amazon.json deleted file mode 100644 index aa7dfac..0000000 --- a/sites/all/modules/queue2civicrm/tests/data/sparse_donation_amazon.json +++ /dev/null @@ -1,13 +0,0 @@ -{ - "completion_message_id": "amazon-9876512345-1", - "contribution_tracking_id": "9876512345", - "currency": "PLN", - "date": 1234567, - "fee": "0.15", - "gateway": "amazon", - "gateway_status": "Completed", - "gateway_txn_id": "3611204184", - "gross": "10.00", - "order_id": "9876512345-1", - "payment_method": "amazon" -} diff --git a/sites/all/modules/queue2civicrm/tests/data/sparse_donation_astropay.json b/sites/all/modules/queue2civicrm/tests/data/sparse_donation_astropay.json deleted file mode 100644 index dc97392..0000000 --- a/sites/all/modules/queue2civicrm/tests/data/sparse_donation_astropay.json +++ /dev/null @@ -1,11 +0,0 @@ -{ - "completion_message_id": "astropay-11451123.1", - "contribution_tracking_id": "11451123", - "currency": "BRL", - "date": 1234567, - "gateway": "astropay", - "gateway_txn_id": "44559911", - "gateway_status": "7", - "gross": "20.00", - "order_id": "11451123.1" -} -- To view, visit https://gerrit.wikimedia.org/r/307679 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Ida1a73bc8a2484a6ecf99bc8d70a0e5d69b1233a Gerrit-PatchSet: 1 Gerrit-Project: wikimedia/fundraising/crm Gerrit-Branch: deployment Gerrit-Owner: Ejegg <eeggles...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits