https://www.mediawiki.org/wiki/Special:Code/MediaWiki/105513

Revision: 105513
Author:   khorn
Date:     2011-12-08 03:43:25 +0000 (Thu, 08 Dec 2011)
Log Message:
-----------
Fixes for the Orphan Rectifier, particularly in the ActiveMQ/Stomp area. 

Modified Paths:
--------------
    trunk/extensions/DonationInterface/activemq_stomp/activemq_stomp.php
    trunk/extensions/DonationInterface/globalcollect_gateway/scripts/orphans.php

Modified: trunk/extensions/DonationInterface/activemq_stomp/activemq_stomp.php
===================================================================
--- trunk/extensions/DonationInterface/activemq_stomp/activemq_stomp.php        
2011-12-08 03:43:07 UTC (rev 105512)
+++ trunk/extensions/DonationInterface/activemq_stomp/activemq_stomp.php        
2011-12-08 03:43:25 UTC (rev 105513)
@@ -250,6 +250,14 @@
 function stompFetchMessages( $queue, $selector = null, $limit = 50 ){
        global $wgStompQueueName, $wgPendingStompQueueName, 
$wgLimboStompQueueName, $wgCCLimboStompQueueName;
        
+       static $selector_last = null;
+       if ( !is_null( $selector_last ) && $selector_last != $selector ){
+               $renew = true;
+       } else {
+               $renew = false;
+       }
+       $selector_last = $selector;
+       
        switch($queue){
                case 'pending':
                        $queue = $wgPendingStompQueueName;
@@ -266,22 +274,22 @@
                        break;
        }
        
-       //This needs to be renewed every time, or the selectors won't work. 
-       //So says the internets, at least.  
-       $stomp = getDIStompConnection( true ); 
+       //This needs to be renewed every time we change the selectors. 
+       $stomp = getDIStompConnection( $renew ); 
        
        $properties = array( 'ack' => 'client' );
        if ( !is_null( $selector ) ){
                $properties['selector'] = $selector;
        }
        
-       $returned = $stomp->subscribe( '/queue/' . $queue, $properties );
+       $stomp->subscribe( '/queue/' . $queue, $properties );
        $message = $stomp->readFrame();
        
        $return = array();
        
        while ( !empty( $message ) && count( $return ) < $limit ) {
                $return[] = $message;
+               $stomp->subscribe( '/queue/' . $queue, $properties );
                $message = $stomp->readFrame();
        }
        

Modified: 
trunk/extensions/DonationInterface/globalcollect_gateway/scripts/orphans.php
===================================================================
--- 
trunk/extensions/DonationInterface/globalcollect_gateway/scripts/orphans.php    
    2011-12-08 03:43:07 UTC (rev 105512)
+++ 
trunk/extensions/DonationInterface/globalcollect_gateway/scripts/orphans.php    
    2011-12-08 03:43:25 UTC (rev 105513)
@@ -73,9 +73,11 @@
                                        $this->handled_ids[$correlation_id] = 
'error';
                                }
                        }
+                       $this->addStompCorrelationIDToAckBucket( false, true ); 
//ack all outstanding. 
+                       $orphans = $this->getStompOrphans();
                }
                
-               $this->addStompCorrelationIDToAckBucket( false, true ); //this 
just acks everything that's waiting for it.
+               $this->addStompCorrelationIDToAckBucket( false, true ); //ack 
all outstanding.
 
                //TODO: Make stats squirt out all over the place.  
                $am = 0;
@@ -113,15 +115,20 @@
                if ( $correlation_id ) {
                        $bucket[$correlation_id] = "'$correlation_id'"; 
//avoiding duplicates.
                        $this->handled_ids[$correlation_id] = 'antimessage';
+                       echo "Added $correlation_id to the ack bucket : Total 
bucket count = " . count( $bucket );
                }
                if ( count( $bucket ) && ( count( $bucket ) >= $count || 
$ackNow ) ){
                        //ack now.
+                       echo 'Acking ' . count( $bucket ) . ' bucket messages.';
                        $selector = 'JMSCorrelationID IN (' . implode( ", ", 
$bucket ) . ')';
                        $ackMe = stompFetchMessages( 'cc-limbo', $selector, 
$count * 100 ); //This is outrageously high, but I just want to be reasonably 
sure we get all the matches. 
                        $retrieved_count = count( $ackMe );
                        if ( $retrieved_count ){
                                stompAckMessages( $ackMe );
                                $this->removed_message_count += 
$retrieved_count;
+                               echo "Done acking $retrieved_count messages. ";
+                       } else {
+                               echo "Oh noes! No messages to ack for some 
reason...";
                        }
                        $bucket = array();
                }
@@ -132,7 +139,7 @@
                $selector = "antimessage = 'true'";
                $antimessages = stompFetchMessages( 'cc-limbo', $selector, 1000 
);
                $count = 0;
-               while ( count( $antimessages ) ){ //if there's an antimessage, 
we can ack 'em all right now. 
+               while ( count( $antimessages ) && $this->keepGoing() ){ //if 
there's an antimessage, we can ack 'em all right now. 
                        $count += count( $antimessages );
                        foreach ( $antimessages as $message ){
                                //add the correlation ID to the ack bucket. 
@@ -166,7 +173,13 @@
                                        $elapsed = $this->now - 
$decoded['date'];
                                        if ( $elapsed > $time_buffer ){
                                                //we got ourselves an orphan! 
-                                               
$orphans[$message->headers['antimessage']] = $decoded;
+                                               $correlation_id = 
$message->headers['correlation-id'];
+                                               $order_id = explode('-', 
$correlation_id);
+                                               $order_id = $order_id[1];
+                                               $decoded['order_id'] = 
$order_id;
+                                               $decoded['i_order_id'] = 
$order_id;
+                                               $orphans[$correlation_id] = 
$decoded;
+                                               echo "\nFound an orphan! 
$correlation_id";
                                        }
                                }
                        }
@@ -261,6 +274,7 @@
         * @return boolean True if the orphan has been rectified, false if not. 
         */
        function rectifyOrphan( $data, $query_contribution_tracking = true ){
+               echo "\nRectifying Orphan " . $data['order_id'];
                $rectified = false;
                
                $this->adapter->loadDataAndReInit( $data, 
$query_contribution_tracking );


_______________________________________________
MediaWiki-CVS mailing list
MediaWiki-CVS@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-cvs

Reply via email to