Awight has uploaded a new change for review. https://gerrit.wikimedia.org/r/207028
Change subject: DO NOT MERGE and the orphan slayer, too ...................................................................... DO NOT MERGE and the orphan slayer, too Will be squashed into the previous commit. Change-Id: I5fff9730ce6607146701b6c3ed0d1db1a11c9591 --- M gateway_common/gateway.adapter.php M globalcollect_gateway/scripts/orphans.php 2 files changed, 102 insertions(+), 349 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/mediawiki/extensions/DonationInterface refs/changes/28/207028/1 diff --git a/gateway_common/gateway.adapter.php b/gateway_common/gateway.adapter.php index 987061d..ced03f5 100644 --- a/gateway_common/gateway.adapter.php +++ b/gateway_common/gateway.adapter.php @@ -1869,7 +1869,7 @@ * * TODO: Stop saying "STOMP". */ - protected function getStompTransaction( $recoverTimestamp = false ) { + protected function getStompTransaction( $antiMessage = false, $recoverTimestamp = false ) { $transaction = array( 'gateway_txn_id' => $this->getTransactionGatewayTxnID(), 'payment_method' => $this->getData_Unstaged_Escaped( 'payment_method' ), @@ -1890,19 +1890,19 @@ // in case the transaction already had keys with those values $transaction = array_merge( $stomp_data, $transaction ); - // And now determine the date; which is annoyingly not as easy as one would like it - // if we're attempting to recover some data: ie: we're an orphan - // FIXME: Can't we make this the default? - $timestamp = null; - if ( $recoverTimestamp === true ) { - if ( !is_null( $this->getData_Unstaged_Escaped( 'date' ) ) ) { - $timestamp = $this->getData_Unstaged_Escaped( 'date' ); - } elseif ( !is_null( $this->getData_Unstaged_Escaped( 'ts' ) ) ) { - // That this works is mildly surprising - $timestamp = strtotime( $this->getData_Unstaged_Escaped( 'ts' ) ); + // And now determine the date; which is annoyingly not as easy as one would like it + // if we're attempting to recover some data: ie: we're an orphan + $timestamp = null; + if ( $recoverTimestamp === true ) { + if ( !is_null( $this->getData_Unstaged_Escaped( 'date' ) ) ) { + $timestamp = $this->getData_Unstaged_Escaped( 'date' ); + } elseif ( !is_null( $this->getData_Unstaged_Escaped( 'ts' ) ) ) { + // That this works is mildly surprising + $timestamp = strtotime( $this->getData_Unstaged_Escaped( 'ts' ) ); + } } + $transaction['date'] = ( $timestamp === null ) ? time() : $timestamp; } - $transaction['date'] = ( $timestamp === null ) ? time() : $timestamp; return $transaction; } diff --git a/globalcollect_gateway/scripts/orphans.php b/globalcollect_gateway/scripts/orphans.php index 3e53492..676bc94 100644 --- a/globalcollect_gateway/scripts/orphans.php +++ b/globalcollect_gateway/scripts/orphans.php @@ -13,14 +13,13 @@ require_once( "$IP/maintenance/Maintenance.php" ); class GlobalCollectOrphanRectifier extends Maintenance { - - protected $killfiles = array(); - protected $order_ids = array(); - protected $max_per_execute = 500; //only really used if you're going by-file. - protected $target_execute_time = 30; //(seconds) - only used by the stomp option. protected $adapter; - - function execute(){ + protected $logger; + protected $handled_ids = array(); + protected $removed_message_count; + protected $start_time; + + public function execute() { //have to turn this off here, until we know it's using the user's ip, and //not 127.0.0.1 during the batch process. global $wgDonationInterfaceEnableIPVelocityFilter; @@ -29,26 +28,8 @@ return; } $wgDonationInterfaceEnableIPVelocityFilter = false; - - $func = 'parse_files'; - if ( $this->getOrphanGlobal( 'override_command_line_params' ) ){ - //do that - $func = $this->getOrphanGlobal( 'function' ); - $this->target_execute_time = $this->getOrphanGlobal( 'target_execute_time' ); - $this->max_per_execute = $this->getOrphanGlobal( 'max_per_execute' ); - } else { - if ( !empty( $_SERVER['argv'][1] ) ){ - if ( $_SERVER['argv'][1] === 'stomp' ){ - $func = 'orphan_stomp'; - if ( !empty( $_SERVER['argv'][2] ) && is_numeric( $_SERVER['argv'][2] ) ){ - $this->target_execute_time = $_SERVER['argv'][2]; - } - } elseif ( is_numeric( $_SERVER['argv'][1] ) ){ - $this->max_per_execute = $_SERVER['argv'][1]; - } - } - } - + + // FIXME: Is this just to trigger batch mode? $data = array( 'wheeee' => 'yes' ); @@ -56,15 +37,10 @@ $this->logger = DonationLoggerFactory::getLogger( $this->adapter ); //Now, actually do the processing. - if ( method_exists( $this, $func ) ) { - $this->{$func}(); - } else { - echo "There's no $func in Orphan Rectifying!\n"; - die(); - } + $this->orphan_stomp(); } - - function orphan_stomp(){ + + protected function orphan_stomp(){ echo "Orphan Stomp\n"; $this->removed_message_count = 0; $this->start_time = time(); @@ -74,25 +50,9 @@ //Then, we go back and pull more... and that same one is in the list again. We should stop after one try per message per execute. //We should also be smart enough to not process things we believe we just deleted. $this->handled_ids = array(); - - //first, we need to... clean up the limbo queue. - - //building in some redundancy here. - $collider_keepGoing = true; - $am_called_count = 0; - while ( $collider_keepGoing ){ - $antimessageCount = $this->handleStompAntiMessages(); - $am_called_count += 1; - if ( $antimessageCount < 10 ){ - $collider_keepGoing = false; - } else { - sleep(2); //two seconds. - } - } - $this->logger->info( 'Removed ' . $this->removed_message_count . ' messages and antimessages.' ); - + if ( $this->keepGoing() ){ - //Pull a batch of CC orphans, keeping in mind that Things May Have Happened in the small slice of time since we handled the antimessages. + //Pull a batch of CC orphans $orphans = $this->getStompOrphans(); while ( count( $orphans ) && $this->keepGoing() ){ echo count( $orphans ) . " orphans left this batch\n"; @@ -100,24 +60,22 @@ foreach ( $orphans as $correlation_id => $orphan ) { //process if ( $this->keepGoing() ){ + // TODO: Maybe we can simplify by checking that modified time < job start time. echo "Attempting to rectify orphan $correlation_id\n"; if ( $this->rectifyOrphan( $orphan ) ){ - $this->addStompCorrelationIDToAckBucket( $correlation_id ); + $this->ackMessage( $correlation_id ); $this->handled_ids[$correlation_id] = 'rectified'; } else { $this->handled_ids[$correlation_id] = 'error'; } } } - $this->addStompCorrelationIDToAckBucket( false, true ); //ack all outstanding. if ( $this->keepGoing() ){ $orphans = $this->getStompOrphans(); } } } - $this->addStompCorrelationIDToAckBucket( false, true ); //ack all outstanding. - //TODO: Make stats squirt out all over the place. $am = 0; $rec = 0; @@ -125,9 +83,6 @@ $fe = 0; foreach( $this->handled_ids as $id=>$whathappened ){ switch ( $whathappened ){ - case 'antimessage' : - $am += 1; - break; case 'rectified' : $rec += 1; break; @@ -140,7 +95,6 @@ } } $final = "\nDone! Final results: \n"; - $final .= " $am destroyed via antimessage (called $am_called_count times) \n"; $final .= " $rec rectified orphans \n"; $final .= " $err errored out \n"; $final .= " $fe false orphans caught \n"; @@ -154,9 +108,9 @@ echo $final; } - function keepGoing(){ + protected function keepGoing(){ $elapsed = $this->getProcessElapsed(); - if ( $elapsed < $this->target_execute_time ){ + if ( $elapsed < $this->getOrphanGlobal( 'target_execute_time' ) ){ return true; } else { return false; @@ -168,188 +122,69 @@ * the cronspammer. * @return int elapsed time since start in seconds */ - function getProcessElapsed(){ + protected function getProcessElapsed(){ $elapsed = time() - $this->start_time; echo "Elapsed Time: $elapsed\n"; return $elapsed; } - function addStompCorrelationIDToAckBucket( $correlation_id, $ackNow = false ){ - static $bucket = array(); - $count = 50; //sure. Why not? - if ( $correlation_id ) { - $bucket[$correlation_id] = "'$correlation_id'"; //avoiding duplicates. - $this->handled_ids[$correlation_id] = 'antimessage'; - } - if ( count( $bucket ) && ( count( $bucket ) >= $count || $ackNow ) ){ - //ack now. - echo 'Acking ' . count( $bucket ) . " bucket messages.\n"; - $selector = 'JMSCorrelationID IN (' . implode( ", ", $bucket ) . ')'; - $ackMe = stompFetchMessages( 'cc-limbo', $selector, $count * 100 ); //This is outrageously high, but I just want to be reasonably sure we get all the matches. - $retrieved_count = count( $ackMe ); - if ( $retrieved_count ){ - stompAckMessages( $ackMe ); - $this->removed_message_count += $retrieved_count; - echo "Done acking $retrieved_count messages. \n"; - } else { - echo "Oh noes! No messages retrieved for $selector...\n"; - } - $bucket = array(); - } - + protected function ackMessage( $correlation_id ) { + $this->handled_ids[$correlation_id] = 'antimessage'; + + DonationQueue::instance()->delete( + $correlation_id, GlobalCollectAdapter::CC_LIMBO_QUEUE ); } - - function handleStompAntiMessages(){ - $selector = "antimessage = 'true' AND gateway='globalcollect'"; - $antimessages = stompFetchMessages( 'cc-limbo', $selector, 1000 ); - $count = 0; - while ( count( $antimessages ) > 10 && $this->keepGoing() ){ //if there's an antimessage, we can ack 'em all right now. - echo "Colliding " . count( $antimessages ) . " antimessages\n"; - $count += count( $antimessages ); - foreach ( $antimessages as $message ){ - //add the correlation ID to the ack bucket. - if (array_key_exists('correlation-id', $message->headers)) { - $this->addStompCorrelationIDToAckBucket( $message->headers['correlation-id'] ); - } else { - echo 'The STOMP message ' . $message->headers['message-id'] . " has no correlation ID!\n"; - } - } - $this->addStompCorrelationIDToAckBucket( false, true ); //ack all outstanding. - $antimessages = stompFetchMessages( 'cc-limbo', $selector, 1000 ); - } - $this->addStompCorrelationIDToAckBucket( false, true ); //this just acks everything that's waiting for it. - $this->logger->info( "Found $count antimessages." ); - return $count; - } - + /** - * Returns an array of **at most** 300 decoded orphans that we don't think we've rectified yet. - * @return array keys are the correlation_id, and the values are the decoded stomp message body. + * Returns an array of at most $batch_size decoded orphans that we don't + * think we've rectified yet. + * + * @return array keys are the correlation_id, and the values are the + * decoded stomp message body. */ - function getStompOrphans(){ + protected function getStompOrphans(){ + // FIXME: Expiration should be set in configuration, and enforced by + // the queue's native expiry anyway. $time_buffer = 60*20; //20 minutes? Sure. Why not? - $selector = "payment_method = 'cc' AND gateway='globalcollect'"; - echo "Fetching 300 Orphans\n"; - $messages = stompFetchMessages( 'cc-limbo', $selector, 300 ); + $batch_size = 300; + echo "Fetching {$batch_size} Orphans\n"; + + $messages = DonationQueue::instance()->popMultiple( + GlobalCollectAdapter::CC_LIMBO_QUEUE, $batch_size ); + $orphans = array(); - $false_orphans = array(); foreach ( $messages as $message ){ - //this next block will do quite a lot of antimessage collision - //when the queue is not being railed. - if ( array_key_exists('antimessage', $message->headers ) ){ - $correlation_id = $message->headers['correlation-id']; - $false_orphans[] = $correlation_id; - echo "False Orphan! $correlation_id \n"; - } else { - //legit message - if ( !array_key_exists( $message->headers['correlation-id'], $this->handled_ids ) ) { - //check the timestamp to see if it's old enough. - $decoded = json_decode($message->body, true); - if ( array_key_exists( 'date', $decoded ) ){ - $elapsed = $this->start_time - $decoded['date']; - if ( $elapsed > $time_buffer ){ - //we got ourselves an orphan! - $correlation_id = $message->headers['correlation-id']; - $order_id = explode('-', $correlation_id); - $order_id = $order_id[1]; - $decoded['order_id'] = $order_id; - $decoded = unCreateQueueMessage($decoded); - $decoded['card_num'] = ''; - $orphans[$correlation_id] = $decoded; - echo "Found an orphan! $correlation_id \n"; - } + $correlation_id = $message['correlation-id']; + if ( !array_key_exists( $correlation_id, $this->handled_ids ) ) { + //check the timestamp to see if it's old enough. + if ( array_key_exists( 'date', $message ) ) { + $elapsed = $this->start_time - $message['date']; + if ( $elapsed > $time_buffer ){ + //we got ourselves an orphan! + // FIXME: Reuse unstaging function. + $order_id = explode( '-', $correlation_id ); + $order_id = $order_id[1]; + $message['order_id'] = $order_id; + $message = $this->unCreateQueueMessage( $message ); + $message['card_num'] = ''; + $orphans[$correlation_id] = $message; + echo "Found an orphan! $correlation_id \n"; } } } } - - foreach ( $orphans as $cid => $data ){ - if ( in_array( $cid, $false_orphans ) ){ - unset( $orphans[$cid] ); - $this->addStompCorrelationIDToAckBucket( $cid ); - $this->handled_ids[ $cid ] = 'false_orphan'; - } - } - + return $orphans; } - - function parse_files(){ - //all the old stuff goes here. - $order_ids = file( 'orphanlogs/order_ids.txt', FILE_SKIP_EMPTY_LINES ); - foreach ( $order_ids as $key=>$val ){ - $order_ids[$key] = trim( $val ); - } - foreach ( $order_ids as $id ){ - $this->order_ids[$id] = $id; //easier to unset this way. - } - $outstanding_count = count( $this->order_ids ); - echo "Order ID count: $outstanding_count \n"; - - $files = $this->getAllLogFileNames(); - $payments = array(); - foreach ( $files as $file ){ - if ( count( $payments ) < $this->max_per_execute ){ - $file_array = $this->getLogfileLines( $file ); - $payments = array_merge( $this->findTransactionLines( $file_array ), $payments ); - if ( count( $payments ) === 0 ){ - $this->killfiles[] = $file; - echo print_r( $this->killfiles, true ); - } - } - } - - $this->adapter->setCurrentTransaction('INSERT_ORDERWITHPAYMENT'); - $xml = new DomDocument; - - //fields that have generated notices if they're not there. - $additional_fields = array( - 'card_num', - 'utm_medium', - 'utm_campaign', - 'referrer', - ); - - foreach ($payments as $key => $payment_data){ - $xml->loadXML($payment_data['xml']); - $parsed = $this->adapter->getResponseData($xml); - $payments[$key]['parsed'] = $parsed; - $payments[$key]['unstaged'] = $this->adapter->unstage_data($parsed); - $payments[$key]['unstaged']['contribution_tracking_id'] = $payments[$key]['contribution_tracking_id']; - foreach ($additional_fields as $val){ - if (!array_key_exists($val, $payments[$key]['unstaged'])){ - $payments[$key]['unstaged'][$val] = null; - } - } - } - - // ADDITIONAL: log out what you did here, to... somewhere. - // Preferably *before* you rewrite the Order ID file. - //we may need to unset some hooks out here. Anything that requires user interaction would make no sense here. - $i = 0; - foreach($payments as $payment_data){ - if ($i < $this->max_per_execute){ - ++$i; - if ( $this->rectifyOrphan( $payment_data['unstaged'] ) ) { - unset( $this->order_ids[$payment_data['unstaged']['order_id']] ); - } - } - } - - if ($outstanding_count != count($this->order_ids)){ - $this->rewriteOrderIds(); - } - } - /** - * Uses the Orphan Adapter to rectify a single orphan. Returns a boolean letting the caller know if + * Uses the Orphan Adapter to rectify (complete the charge for) a single orphan. Returns a boolean letting the caller know if * the orphan has been fully rectified or not. * @param array $data Some set of orphan data. * @param boolean $query_contribution_tracking A flag specifying if we should query the contribution_tracking table or not. * @return boolean True if the orphan has been rectified, false if not. */ - function rectifyOrphan( $data, $query_contribution_tracking = true ){ + protected function rectifyOrphan( $data, $query_contribution_tracking = true ){ echo 'Rectifying Orphan ' . $data['order_id'] . "\n"; $rectified = false; @@ -382,8 +217,10 @@ /** * Gets the global setting for the key passed in. * @param type $key + * + * FIXME: Reuse GatewayAdapter::getGlobal. */ - function getOrphanGlobal( $key ){ + protected function getOrphanGlobal( $key ){ global $wgDonationInterfaceOrphanCron; if ( array_key_exists( $key, $wgDonationInterfaceOrphanCron ) ){ return $wgDonationInterfaceOrphanCron[$key]; @@ -391,123 +228,39 @@ return NULL; } } - - function getAllLogFileNames(){ - $files = array(); - if ($handle = opendir(dirname(__FILE__) . '/orphanlogs/')){ - while ( ($file = readdir($handle)) !== false ){ - if (trim($file, '.') != '' && $file != 'order_ids.txt' && $file != '.svn'){ - $files[] = dirname(__FILE__) . '/orphanlogs/' . $file; - } + + /** + * Called by the orphan rectifier to change a queue message back into a gateway + * transaction array, basically undoing the mappings from createQueueMessage + * + * @param array $transaction STOMP message + * + * @return array message with queue keys remapped to gateway keys + */ + protected function unCreateQueueMessage( $transaction ) { + // For now, this function assumes that we have a complete queue message. + // TODO: Something more robust and programmatic, as time allows. This whole file is just terrible. + + $rekey = array( + 'first_name' => 'fname', + 'last_name' => 'lname', + 'street_address' => 'street', + 'state_province' => 'state', + 'postal_code' => 'zip', + 'currency' => 'currency_code', + 'gross' => 'amount', + ); + + foreach ( $rekey as $stomp => $di ){ + if ( isset( $transaction[$stomp] ) ){ + $transaction[$di] = $transaction[$stomp]; + unset($transaction[$stomp]); } } - closedir($handle); - return $files; + + return $transaction; } - - function findTransactionLines($file){ - $lines = array(); - $orders = array(); - $contrib_id_finder = array(); - foreach ($file as $line_no=>$line_data){ - if (strpos($line_data, '<XML><REQUEST><ACTION>INSERT_ORDERWITHPAYMENT') === 0){ - $lines[$line_no] = $line_data; - } elseif (strpos($line_data, 'Raw XML Response')){ - $contrib_id_finder[] = $line_data; - } elseif (strpos(trim($line_data), '<ORDERID>') === 0){ - $contrib_id_finder[] = trim($line_data); - } - } - - $order_ids = $this->order_ids; - foreach ($lines as $line_no=>$line_data){ - if (count($orders) < $this->max_per_execute){ - $pos1 = strpos($line_data, '<ORDERID>') + 9; - $pos2 = strpos($line_data, '</ORDERID>'); - if ($pos2 > $pos1){ - $tmp = substr($line_data, $pos1, $pos2-$pos1); - if (isset($order_ids[$tmp])){ - $orders[$tmp] = trim($line_data); - unset($order_ids[$tmp]); - } - } - } - } - - //reverse the array, so we find the last instance first. - $contrib_id_finder = array_reverse($contrib_id_finder); - foreach ($orders as $order_id => $xml){ - $finder = array_search("<ORDERID>$order_id</ORDERID>", $contrib_id_finder); - - //now search forward (which is actually backward) to the "Raw XML" line, so we can get the contribution_tracking_id - //TODO: Some kind of (in)sanity check for this. Just because we've found it one step backward doesn't mean... - //...but it's kind of good. For now. - $explode_me = false; - while (!$explode_me){ - ++$finder; - if (strpos($contrib_id_finder[$finder], "Raw XML Response")){ - $explode_me = $contrib_id_finder[$finder]; - } - } - if (strlen($explode_me)){ - $explode_me = explode(': ', $explode_me); - $contribution_tracking_id = trim($explode_me[1]); - $orders[$order_id] = array( - 'xml' => $xml, - 'contribution_tracking_id' => $contribution_tracking_id, - ); - } - } - - return $orders; - } - - function rewriteOrderIds() { - $file = fopen('orphanlogs/order_ids.txt', 'w'); - $outstanding_orders = implode("\n", $this->order_ids); - fwrite($file, $outstanding_orders); - fclose($file); - } - - function getLogfileLines( $file ){ - $array = file($file, FILE_SKIP_EMPTY_LINES); - //now, check about 50 lines to make sure we're not seeing any of that #012, #015 crap. - $checkcount = 50; - if (count($array) < $checkcount){ - $checkcount = count($array); - } - $convert = false; - for ($i=0; $i<$checkcount; ++$i){ - if( strpos($array[$i], '#012') || strpos($array[$i], '#015') ){ - $convert = true; - break; - } - } - if ($convert) { - $array2 = array(); - foreach ($array as $line){ - if (strpos($line, '#012')){ - $line = str_replace('#012', "\n", $line); - } - if (strpos($line, '#015') ){ - $line = str_replace('#015', "\r", $line); - } - $array2[] = $line; - } - $newfile = implode("\n", $array2); - - $handle = fopen($file, 'w'); - fwrite($handle, $newfile); - fclose($handle); - $array = file($file, FILE_SKIP_EMPTY_LINES); - } - - return $array; - } - } -$maintClass = "GlobalCollectOrphanRectifier"; -require_once( "$IP/maintenance/doMaintenance.php" ); - - +$maintClass = 'GlobalCollectOrphanRectifier'; +require_once RUN_MAINTENANCE_IF_MAIN; -- To view, visit https://gerrit.wikimedia.org/r/207028 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I5fff9730ce6607146701b6c3ed0d1db1a11c9591 Gerrit-PatchSet: 1 Gerrit-Project: mediawiki/extensions/DonationInterface Gerrit-Branch: master Gerrit-Owner: Awight <awi...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits