Awight has uploaded a new change for review. https://gerrit.wikimedia.org/r/207741
Change subject: WIP Stop using STOMP in the orphan slayer ...................................................................... WIP Stop using STOMP in the orphan slayer Change-Id: I36b8ce69f74e6204ded478f4cdae9c23a8573c09 TODO: squash with other anti-STOMP patches --- M gateway_common/gateway.adapter.php M globalcollect_gateway/scripts/orphan_adapter.php M globalcollect_gateway/scripts/orphans.php 3 files changed, 87 insertions(+), 222 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/mediawiki/extensions/DonationInterface refs/changes/41/207741/1 diff --git a/gateway_common/gateway.adapter.php b/gateway_common/gateway.adapter.php index 2b7ee0d..f5d65d3 100644 --- a/gateway_common/gateway.adapter.php +++ b/gateway_common/gateway.adapter.php @@ -1894,7 +1894,7 @@ * * TODO: Stop saying "STOMP". */ - protected function getStompTransaction( $antiMessage = false, $recoverTimestamp = false ) { + protected function getStompTransaction( $recoverTimestamp = false ) { $transaction = array( 'gateway_txn_id' => $this->getTransactionGatewayTxnID(), 'payment_method' => $this->getData_Unstaged_Escaped( 'payment_method' ), @@ -1919,19 +1919,19 @@ // in case the transaction already had keys with those values $transaction = array_merge( $stomp_data, $transaction ); - // And now determine the date; which is annoyingly not as easy as one would like it - // if we're attempting to recover some data: ie: we're an orphan - $timestamp = null; - if ( $recoverTimestamp === true ) { - if ( !is_null( $this->getData_Unstaged_Escaped( 'date' ) ) ) { - $timestamp = $this->getData_Unstaged_Escaped( 'date' ); - } elseif ( !is_null( $this->getData_Unstaged_Escaped( 'ts' ) ) ) { - // That this works is mildly surprising - $timestamp = strtotime( $this->getData_Unstaged_Escaped( 'ts' ) ); - } + // And now determine the date; which is annoyingly not as easy as one would like it + // if we're attempting to recover some data: ie: we're an orphan + // FIXME: Can't we make this the default? + $timestamp = null; + if ( $recoverTimestamp === true ) { + if ( !is_null( $this->getData_Unstaged_Escaped( 'date' ) ) ) { + $timestamp = $this->getData_Unstaged_Escaped( 'date' ); + } elseif ( !is_null( $this->getData_Unstaged_Escaped( 'ts' ) ) ) { + // That this works is mildly surprising + $timestamp = strtotime( $this->getData_Unstaged_Escaped( 'ts' ) ); } - $transaction['date'] = ( $timestamp === null ) ? time() : $timestamp; } + $transaction['date'] = ( $timestamp === null ) ? time() : $timestamp; return $transaction; } diff --git a/globalcollect_gateway/scripts/orphan_adapter.php b/globalcollect_gateway/scripts/orphan_adapter.php index d14bbfe..0895ab8 100644 --- a/globalcollect_gateway/scripts/orphan_adapter.php +++ b/globalcollect_gateway/scripts/orphan_adapter.php @@ -150,59 +150,29 @@ } /** - * Copying this here because it's the fastest way to bring in an actual timestamp. - */ - protected function doStompTransaction() { - if ( !$this->getGlobal( 'EnableStomp' ) ) { - return; - } - $this->debugarray[] = "Attempting Stomp Transaction!"; - $hook = ''; - - $status = $this->getFinalStatus(); - switch ( $status ) { - case 'complete': - $hook = 'gwStomp'; - break; - case 'pending': - case 'pending-poke': - $hook = 'gwPendingStomp'; - break; - } - if ( $hook === '' ) { - $this->debugarray[] = "No Stomp Hook Found for FINAL_STATUS $status"; - return; - } - - if ( !is_null( $this->getData_Unstaged_Escaped( 'date' ) ) ) { - $timestamp = $this->getData_Unstaged_Escaped( 'date' ); - } else { - if ( !is_null( $this->getData_Unstaged_Escaped( 'ts' ) ) ) { - $timestamp = strtotime( $this->getData_Unstaged_Escaped( 'ts' ) ); //I hate that this works. - } else { - $timestamp = time(); - } - } - - // send the thing. - $transaction = array( - 'response' => $this->getTransactionMessage(), - 'date' => $timestamp, - 'gateway_txn_id' => $this->getTransactionGatewayTxnID(), - //'language' => '', - ); - $transaction += $this->getData_Unstaged_Escaped(); - - try { - WmfFramework::runHooks( $hook, array( $transaction ) ); - } catch ( Exception $e ) { - $this->logger->critical( "STOMP ERROR. Could not add message. " . $e->getMessage() ); - } - } - - /** * Override live adapter with a no-op since orphan doesn't have any new info * before GET_ORDERSTATUS */ protected function pre_process_get_orderstatus() { } + + /** + * Add some extra fields to the wire message + * @see GatewayAdapter::getStompTransaction + */ + protected function getStompTransaction() { + // TODO: Why is the timestamp handled differently only for orphans? + // Rebuild the timestamp if necessary and possible, otherwise use the current time. + if ( !$this->getData_Unstaged_Escaped( 'date' ) ) { + if ( $this->getData_Unstaged_Escaped( 'ts' ) ) { + $timestamp = strtotime( $this->getData_Unstaged_Escaped( 'ts' ) ); //I hate that this works. + } else { + $timestamp = time(); + } + $this->addRequestData( array( + 'date' => $timestamp, + ) ); + } + + $transaction = parent::getStompTransaction(); + } } diff --git a/globalcollect_gateway/scripts/orphans.php b/globalcollect_gateway/scripts/orphans.php index 24d4bcf..22b0e9d 100644 --- a/globalcollect_gateway/scripts/orphans.php +++ b/globalcollect_gateway/scripts/orphans.php @@ -13,12 +13,12 @@ require_once( "$IP/maintenance/Maintenance.php" ); class GlobalCollectOrphanRectifier extends Maintenance { - - protected $killfiles = array(); - protected $order_ids = array(); - protected $max_per_execute = 500; //only really used if you're going by-file. protected $adapter; - + protected $logger; + protected $handled_ids = array(); + protected $removed_message_count; + protected $start_time; + public function execute() { //have to turn this off here, until we know it's using the user's ip, and //not 127.0.0.1 during the batch process. @@ -28,26 +28,6 @@ return; } $wgDonationInterfaceEnableIPVelocityFilter = false; - - $func = 'parse_files'; - if ( $this->getOrphanGlobal( 'override_command_line_params' ) ){ - //do that - $func = $this->getOrphanGlobal( 'function' ); - $this->target_execute_time = $this->getOrphanGlobal( 'target_execute_time' ); - $this->max_per_execute = $this->getOrphanGlobal( 'max_per_execute' ); - } else { - if ( !empty( $_SERVER['argv'][1] ) ){ - if ( $_SERVER['argv'][1] === 'stomp' ){ - $func = 'orphan_stomp'; - if ( !empty( $_SERVER['argv'][2] ) && is_numeric( $_SERVER['argv'][2] ) ){ - $this->target_execute_time = $_SERVER['argv'][2]; - } - } elseif ( is_numeric( $_SERVER['argv'][1] ) ){ - $this->max_per_execute = $_SERVER['argv'][1]; - } - } - } - // FIXME: Is this just to trigger batch mode? $data = array( @@ -70,25 +50,9 @@ //Then, we go back and pull more... and that same one is in the list again. We should stop after one try per message per execute. //We should also be smart enough to not process things we believe we just deleted. $this->handled_ids = array(); - - //first, we need to... clean up the limbo queue. - - //building in some redundancy here. - $collider_keepGoing = true; - $am_called_count = 0; - while ( $collider_keepGoing ){ - $antimessageCount = $this->handleStompAntiMessages(); - $am_called_count += 1; - if ( $antimessageCount < 10 ){ - $collider_keepGoing = false; - } else { - sleep(2); //two seconds. - } - } - $this->logger->info( 'Removed ' . $this->removed_message_count . ' messages and antimessages.' ); - + if ( $this->keepGoing() ){ - //Pull a batch of CC orphans, keeping in mind that Things May Have Happened in the small slice of time since we handled the antimessages. + //Pull a batch of CC orphans $orphans = $this->getStompOrphans(); while ( count( $orphans ) && $this->keepGoing() ){ echo count( $orphans ) . " orphans left this batch\n"; @@ -99,8 +63,6 @@ // TODO: Maybe we can simplify by checking that modified time < job start time. echo "Attempting to rectify orphan $correlation_id\n"; if ( $this->rectifyOrphan( $orphan ) ){ - // TODO: Stop mirroring to STOMP. - $this->addStompCorrelationIDToAckBucket( $correlation_id ); $this->ackMessage( $correlation_id ); $this->handled_ids[$correlation_id] = 'rectified'; } else { @@ -108,17 +70,12 @@ } } } - // TODO: Stop mirroring to STOMP. - $this->addStompCorrelationIDToAckBucket( false, true ); //ack all outstanding. if ( $this->keepGoing() ){ $orphans = $this->getStompOrphans(); } } } - // TODO: Stop mirroring to STOMP. - $this->addStompCorrelationIDToAckBucket( false, true ); //ack all outstanding. - //TODO: Make stats squirt out all over the place. $am = 0; $rec = 0; @@ -126,9 +83,6 @@ $fe = 0; foreach( $this->handled_ids as $id=>$whathappened ){ switch ( $whathappened ){ - case 'antimessage' : - $am += 1; - break; case 'rectified' : $rec += 1; break; @@ -141,7 +95,6 @@ } } $final = "\nDone! Final results: \n"; - $final .= " $am destroyed via antimessage (called $am_called_count times) \n"; $final .= " $rec rectified orphans \n"; $final .= " $err errored out \n"; $final .= " $fe false orphans caught \n"; @@ -175,54 +128,6 @@ return $elapsed; } - function addStompCorrelationIDToAckBucket( $correlation_id, $ackNow = false ){ - static $bucket = array(); - $count = 50; //sure. Why not? - if ( $correlation_id ) { - $bucket[$correlation_id] = "'$correlation_id'"; //avoiding duplicates. - $this->handled_ids[$correlation_id] = 'antimessage'; - } - if ( count( $bucket ) && ( count( $bucket ) >= $count || $ackNow ) ){ - //ack now. - echo 'Acking ' . count( $bucket ) . " bucket messages.\n"; - $selector = 'JMSCorrelationID IN (' . implode( ", ", $bucket ) . ')'; - $ackMe = stompFetchMessages( 'cc-limbo', $selector, $count * 100 ); //This is outrageously high, but I just want to be reasonably sure we get all the matches. - $retrieved_count = count( $ackMe ); - if ( $retrieved_count ){ - stompAckMessages( $ackMe ); - $this->removed_message_count += $retrieved_count; - echo "Done acking $retrieved_count messages. \n"; - } else { - echo "Oh noes! No messages retrieved for $selector...\n"; - } - $bucket = array(); - } - - } - - function handleStompAntiMessages(){ - $selector = "antimessage = 'true' AND gateway='globalcollect'"; - $antimessages = stompFetchMessages( 'cc-limbo', $selector, 1000 ); - $count = 0; - while ( count( $antimessages ) > 10 && $this->keepGoing() ){ //if there's an antimessage, we can ack 'em all right now. - echo "Colliding " . count( $antimessages ) . " antimessages\n"; - $count += count( $antimessages ); - foreach ( $antimessages as $message ){ - //add the correlation ID to the ack bucket. - if (array_key_exists('correlation-id', $message->headers)) { - $this->addStompCorrelationIDToAckBucket( $message->headers['correlation-id'] ); - } else { - echo 'The STOMP message ' . $message->headers['message-id'] . " has no correlation ID!\n"; - } - } - $this->addStompCorrelationIDToAckBucket( false, true ); //ack all outstanding. - $antimessages = stompFetchMessages( 'cc-limbo', $selector, 1000 ); - } - $this->addStompCorrelationIDToAckBucket( false, true ); //this just acks everything that's waiting for it. - $this->logger->info( "Found $count antimessages." ); - return $count; - } - protected function ackMessage( $correlation_id ) { $this->handled_ids[$correlation_id] = 'antimessage'; @@ -238,14 +143,9 @@ * decoded stomp message body. */ protected function getStompOrphans(){ - // TODO: Remove STOMP block. // FIXME: Expiration should be set in configuration, and enforced by // the queue's native expiry anyway. $time_buffer = 60*20; //20 minutes? Sure. Why not? - $selector = "payment_method = 'cc' AND gateway='globalcollect'"; - echo "Fetching 300 Orphans\n"; - $messages = stompFetchMessages( 'cc-limbo', $selector, 300 ); - // TODO: Batch size from config. $batch_size = 300; echo "Fetching {$batch_size} Orphans\n"; @@ -255,43 +155,24 @@ // TODO: Quietly assert that $messages === $messagesDQ. $orphans = array(); - $false_orphans = array(); foreach ( $messages as $message ){ - //this next block will do quite a lot of antimessage collision - //when the queue is not being railed. - if ( array_key_exists('antimessage', $message->headers ) ){ - $correlation_id = $message->headers['correlation-id']; - $false_orphans[] = $correlation_id; - echo "False Orphan! $correlation_id \n"; - } else { - //legit message - if ( !array_key_exists( $message->headers['correlation-id'], $this->handled_ids ) ) { - //check the timestamp to see if it's old enough. - $decoded = json_decode($message->body, true); - if ( array_key_exists( 'date', $decoded ) ){ - $elapsed = $this->start_time - $decoded['date']; - if ( $elapsed > $time_buffer ){ - //we got ourselves an orphan! - $correlation_id = $message->headers['correlation-id']; - $order_id = explode('-', $correlation_id); - $order_id = $order_id[1]; - $decoded['order_id'] = $order_id; - $decoded = unCreateQueueMessage($decoded); - $decoded['card_num'] = ''; - $orphans[$correlation_id] = $decoded; - echo "Found an orphan! $correlation_id \n"; - } + $correlation_id = $message['correlation-id']; + if ( !array_key_exists( $correlation_id, $this->handled_ids ) ) { + //check the timestamp to see if it's old enough. + if ( array_key_exists( 'date', $message ) ) { + $elapsed = $this->start_time - $message['date']; + if ( $elapsed > $time_buffer ){ + //we got ourselves an orphan! + // FIXME: Reuse unstaging function. + $order_id = explode( '-', $correlation_id ); + $order_id = $order_id[1]; + $message['order_id'] = $order_id; + $message = $this->unCreateQueueMessage( $message ); + $message['card_num'] = ''; + $orphans[$correlation_id] = $message; + echo "Found an orphan! $correlation_id \n"; } } - } - } - - // TODO: Remove STOMP block. - foreach ( $orphans as $cid => $data ){ - if ( in_array( $cid, $false_orphans ) ){ - unset( $orphans[$cid] ); - $this->addStompCorrelationIDToAckBucket( $cid ); - $this->handled_ids[ $cid ] = 'false_orphan'; } } @@ -350,22 +231,6 @@ // ADDITIONAL: log out what you did here, to... somewhere. // Preferably *before* you rewrite the Order ID file. - //we may need to unset some hooks out here. Anything that requires user interaction would make no sense here. - $i = 0; - foreach($payments as $payment_data){ - if ($i < $this->max_per_execute){ - ++$i; - if ( $this->rectifyOrphan( $payment_data['unstaged'] ) ) { - unset( $this->order_ids[$payment_data['unstaged']['order_id']] ); - } - } - } - - if ($outstanding_count != count($this->order_ids)){ - $this->rewriteOrderIds(); - } - } - /** * Uses the Orphan Adapter to rectify (complete the charge for) a single orphan. Returns a boolean letting the caller know if * the orphan has been fully rectified or not. @@ -527,8 +392,38 @@ fclose($handle); $array = file($file, FILE_SKIP_EMPTY_LINES); } - - return $array; + } + + /** + * Called by the orphan rectifier to change a queue message back into a gateway + * transaction array, basically undoing the mappings from createQueueMessage + * + * @param array $transaction STOMP message + * + * @return array message with queue keys remapped to gateway keys + */ + protected function unCreateQueueMessage( $transaction ) { + // For now, this function assumes that we have a complete queue message. + // TODO: Something more robust and programmatic, as time allows. This whole file is just terrible. + + $rekey = array( + 'first_name' => 'fname', + 'last_name' => 'lname', + 'street_address' => 'street', + 'state_province' => 'state', + 'postal_code' => 'zip', + 'currency' => 'currency_code', + 'gross' => 'amount', + ); + + foreach ( $rekey as $stomp => $di ){ + if ( isset( $transaction[$stomp] ) ){ + $transaction[$di] = $transaction[$stomp]; + unset($transaction[$stomp]); + }; + } + + return $transaction; } } -- To view, visit https://gerrit.wikimedia.org/r/207741 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I36b8ce69f74e6204ded478f4cdae9c23a8573c09 Gerrit-PatchSet: 1 Gerrit-Project: mediawiki/extensions/DonationInterface Gerrit-Branch: master Gerrit-Owner: Awight <awi...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits