Author: jlim
Date: Thu Oct 18 20:54:45 2007
New Revision: 586251

URL: http://svn.apache.org/viewvc?rev=586251&view=rev
Log:
applied patch for AMQ-1440 and AMQ-1439

Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java?rev=586251&r1=586250&r2=586251&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
 Thu Oct 18 20:54:45 2007
@@ -301,6 +301,14 @@
         reconnectTask.shutdown();
     }
 
+       public int getMinAckCount() {
+               return minAckCount;
+       }
+
+       public void setMinAckCount(int minAckCount) {
+               this.minAckCount = minAckCount;
+       }    
+    
     public long getInitialReconnectDelay() {
         return initialReconnectDelay;
     }
@@ -338,24 +346,14 @@
         try {
             synchronized (reconnectMutex) {
 
-                // If it was a request and it was not being tracked by
-                // the state tracker,
-                // then hold it in the requestMap so that we can replay
-                // it later.
-                boolean fanout = isFanoutCommand(command);
-                if (stateTracker.track(command) == null && 
command.isResponseRequired()) {
-                    int size = fanout ? minAckCount : 1;
-                    requestMap.put(new Integer(command.getCommandId()), new 
RequestCounter(command, size));
-                }
-
                 // Wait for transport to be connected.
-                while (connectedCount != minAckCount && !disposed && 
connectionFailure == null) {
+                while (connectedCount < minAckCount && !disposed && 
connectionFailure == null) {
                     LOG.debug("Waiting for at least " + minAckCount + " 
transports to be connected.");
                     reconnectMutex.wait(1000);
                 }
 
                 // Still not fully connected.
-                if (connectedCount != minAckCount) {
+                if (connectedCount < minAckCount) {
 
                     Exception error;
 
@@ -374,6 +372,16 @@
                     throw IOExceptionSupport.create(error);
                 }
 
+                // If it was a request and it was not being tracked by
+                // the state tracker,
+                // then hold it in the requestMap so that we can replay
+                // it later.
+                boolean fanout = isFanoutCommand(command);
+                if (stateTracker.track(command) == null && 
command.isResponseRequired()) {
+                    int size = fanout ? minAckCount : 1;
+                    requestMap.put(new Integer(command.getCommandId()), new 
RequestCounter(command, size));
+                }
+                
                 // Send the message.
                 if (fanout) {
                     for (Iterator<FanoutTransportHandler> iter = 
transports.iterator(); iter.hasNext();) {
@@ -543,4 +551,5 @@
     public boolean isFaultTolerant() {
         return true;
     }
+
 }


Reply via email to