https://www.mediawiki.org/wiki/Special:Code/MediaWiki/105513
Revision: 105513 Author: khorn Date: 2011-12-08 03:43:25 +0000 (Thu, 08 Dec 2011) Log Message: ----------- Fixes for the Orphan Rectifier, particularly in the ActiveMQ/Stomp area. Modified Paths: -------------- trunk/extensions/DonationInterface/activemq_stomp/activemq_stomp.php trunk/extensions/DonationInterface/globalcollect_gateway/scripts/orphans.php Modified: trunk/extensions/DonationInterface/activemq_stomp/activemq_stomp.php =================================================================== --- trunk/extensions/DonationInterface/activemq_stomp/activemq_stomp.php 2011-12-08 03:43:07 UTC (rev 105512) +++ trunk/extensions/DonationInterface/activemq_stomp/activemq_stomp.php 2011-12-08 03:43:25 UTC (rev 105513) @@ -250,6 +250,14 @@ function stompFetchMessages( $queue, $selector = null, $limit = 50 ){ global $wgStompQueueName, $wgPendingStompQueueName, $wgLimboStompQueueName, $wgCCLimboStompQueueName; + static $selector_last = null; + if ( !is_null( $selector_last ) && $selector_last != $selector ){ + $renew = true; + } else { + $renew = false; + } + $selector_last = $selector; + switch($queue){ case 'pending': $queue = $wgPendingStompQueueName; @@ -266,22 +274,22 @@ break; } - //This needs to be renewed every time, or the selectors won't work. - //So says the internets, at least. - $stomp = getDIStompConnection( true ); + //This needs to be renewed every time we change the selectors. + $stomp = getDIStompConnection( $renew ); $properties = array( 'ack' => 'client' ); if ( !is_null( $selector ) ){ $properties['selector'] = $selector; } - $returned = $stomp->subscribe( '/queue/' . $queue, $properties ); + $stomp->subscribe( '/queue/' . $queue, $properties ); $message = $stomp->readFrame(); $return = array(); while ( !empty( $message ) && count( $return ) < $limit ) { $return[] = $message; + $stomp->subscribe( '/queue/' . $queue, $properties ); $message = $stomp->readFrame(); } Modified: trunk/extensions/DonationInterface/globalcollect_gateway/scripts/orphans.php =================================================================== --- trunk/extensions/DonationInterface/globalcollect_gateway/scripts/orphans.php 2011-12-08 03:43:07 UTC (rev 105512) +++ trunk/extensions/DonationInterface/globalcollect_gateway/scripts/orphans.php 2011-12-08 03:43:25 UTC (rev 105513) @@ -73,9 +73,11 @@ $this->handled_ids[$correlation_id] = 'error'; } } + $this->addStompCorrelationIDToAckBucket( false, true ); //ack all outstanding. + $orphans = $this->getStompOrphans(); } - $this->addStompCorrelationIDToAckBucket( false, true ); //this just acks everything that's waiting for it. + $this->addStompCorrelationIDToAckBucket( false, true ); //ack all outstanding. //TODO: Make stats squirt out all over the place. $am = 0; @@ -113,15 +115,20 @@ if ( $correlation_id ) { $bucket[$correlation_id] = "'$correlation_id'"; //avoiding duplicates. $this->handled_ids[$correlation_id] = 'antimessage'; + echo "Added $correlation_id to the ack bucket : Total bucket count = " . count( $bucket ); } if ( count( $bucket ) && ( count( $bucket ) >= $count || $ackNow ) ){ //ack now. + echo 'Acking ' . count( $bucket ) . ' bucket messages.'; $selector = 'JMSCorrelationID IN (' . implode( ", ", $bucket ) . ')'; $ackMe = stompFetchMessages( 'cc-limbo', $selector, $count * 100 ); //This is outrageously high, but I just want to be reasonably sure we get all the matches. $retrieved_count = count( $ackMe ); if ( $retrieved_count ){ stompAckMessages( $ackMe ); $this->removed_message_count += $retrieved_count; + echo "Done acking $retrieved_count messages. "; + } else { + echo "Oh noes! No messages to ack for some reason..."; } $bucket = array(); } @@ -132,7 +139,7 @@ $selector = "antimessage = 'true'"; $antimessages = stompFetchMessages( 'cc-limbo', $selector, 1000 ); $count = 0; - while ( count( $antimessages ) ){ //if there's an antimessage, we can ack 'em all right now. + while ( count( $antimessages ) && $this->keepGoing() ){ //if there's an antimessage, we can ack 'em all right now. $count += count( $antimessages ); foreach ( $antimessages as $message ){ //add the correlation ID to the ack bucket. @@ -166,7 +173,13 @@ $elapsed = $this->now - $decoded['date']; if ( $elapsed > $time_buffer ){ //we got ourselves an orphan! - $orphans[$message->headers['antimessage']] = $decoded; + $correlation_id = $message->headers['correlation-id']; + $order_id = explode('-', $correlation_id); + $order_id = $order_id[1]; + $decoded['order_id'] = $order_id; + $decoded['i_order_id'] = $order_id; + $orphans[$correlation_id] = $decoded; + echo "\nFound an orphan! $correlation_id"; } } } @@ -261,6 +274,7 @@ * @return boolean True if the orphan has been rectified, false if not. */ function rectifyOrphan( $data, $query_contribution_tracking = true ){ + echo "\nRectifying Orphan " . $data['order_id']; $rectified = false; $this->adapter->loadDataAndReInit( $data, $query_contribution_tracking ); _______________________________________________ MediaWiki-CVS mailing list MediaWiki-CVS@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-cvs