Hi Peter,

with this fix, if I read it correctly, and a node crashes while sending data, the sending thread will be stuck sleeping for 25 seconds (recoverTimeout*recoverCounter)? if that is the case, I suggest that we use recoverCounter=0 as the default value.

did I miss something?

Filip

[EMAIL PROTECTED] wrote:
Author: pero
Date: Wed Oct 18 09:37:42 2006
New Revision: 465293

URL: http://svn.apache.org/viewvc?view=rev&rev=465293
Log:
Made recovery more robust.

Modified:
    
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/FastAsyncSocketSender.java
    
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties
    
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/mbeans-descriptors.xml
    tomcat/container/tc5.5.x/webapps/docs/changelog.xml

Modified: 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/FastAsyncSocketSender.java
URL: 
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/FastAsyncSocketSender.java?view=diff&rev=465293&r1=465292&r2=465293
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/FastAsyncSocketSender.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/FastAsyncSocketSender.java
 Wed Oct 18 09:37:42 2006
@@ -54,7 +54,7 @@
     /**
      * The descriptive information about this implementation.
      */
-    private static final String info = "FastAsyncSocketSender/3.0";
+    private static final String info = "FastAsyncSocketSender/3.1";
// ----------------------------------------------------- Instance Variables @@ -69,6 +69,16 @@
     private FastQueueThread queueThread = null;
/**
+     * recover timeout ( default 5 secs)
+     */
+    private long recoverTimeout = 5000;
+ + /**
+     * number of recover tries
+     */
+    private int recoverCounter = 5;
+
+    /**
      * Count number of queue message
      */
     private long inQueueCounter = 0;
@@ -229,6 +239,40 @@
     }
/** + * get current push message recover timeout + * @return current push message recover timeout
+     */
+    public long getRecoverTimeout() {
+ + return recoverTimeout;
+    }
+
+    /**
+     * Set recover timeout (default 5000 msec)
+     * @param timeout
+     */
+    public void setRecoverTimeout(long timeout) {
+        recoverTimeout = timeout;
+    }
+
+    /**
+ * get current push message recover counter + * @return current push message recover counter
+     */
+    public int getRecoverCounter() {
+ + return recoverCounter;
+    }
+
+    /**
+     * Set recover couner (default 5 )
+     * @param counter
+     */
+    public void setRecoverCounter(int counter) {
+        recoverCounter = counter;
+    }
+
+    /**
* change active the queue Thread priority * @param threadPriority value must be between MIN and MAX Thread Priority
      * @exception IllegalArgumentException
@@ -465,25 +509,62 @@
         }
/**
-         * @param entry
+         * Push all messages from queue to other nodes. Is revovery configured
+         * make a resends with some waits.
+         * @param entry list of messages
          */
         protected void pushQueuedMessages(LinkObject entry) {
             do {
                 int messagesize = 0;
+                ClusterData data = null ;
                 try {
-                    ClusterData data = (ClusterData) entry.data();
+                    data = (ClusterData) entry.data();
                     messagesize = data.getMessage().length;
                     sender.pushMessage(data);
                 } catch (Exception x) {
-                    log.warn(sm.getString(
-                            "AsyncSocketSender.send.error", entry
+                    long rTimeout = sender.getRecoverTimeout() ;
+                    int rCounter = sender.getRecoverCounter() ;
+ if(data != null && + rTimeout > 0 && + rCounter > 0) {
+                        // wait that network get stabler
+                        int counter = 1;
+                        boolean success = false ;
+                        do {
+                            try {
+                                Thread.sleep(rTimeout*counter);
+                            } catch (Exception sleep) {
+                            }
+                            try {
+                                if(log.isDebugEnabled()) {
+                                    
log.debug(sm.getString("AsyncSocketSender.send.recover",
+                                            entry.getKey(),
+ new Integer(counter), + new Integer(rCounter), new Long(rTimeout))) ;
+                                }
+                                sender.pushMessage(data);
+                                success = true;
+                            } catch (Exception xx) {
+ counter++; + }
+                        } while (keepRunning && !success && counter <= 
rCounter);
+ + if(!success) {
+                            log.warn(sm.getString(
+                                    "AsyncSocketSender.send.error", entry
                                     .getKey()), x);
-                } finally {
+                        }
+                    } else {
+                        log.warn(sm.getString(
+                                "AsyncSocketSender.send.error", entry
+                                .getKey()), x);
+                    }
+               } finally {
                     outQueueCounter++;
                     decQueuedNrOfBytes(messagesize);
                 }
                 entry = entry.next();
-            } while (entry != null);
+            } while (keepRunning && entry != null);
         }
}

Modified: 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties
URL: 
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties?view=diff&rev=465293&r1=465292&r2=465293
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties
 (original)
+++ 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties
 Wed Oct 18 09:37:42 2006
@@ -2,6 +2,7 @@
 AsyncSocketSender.queue.message=Queue message to [{0}:{1,number,integer}] 
id=[{2}] size={3}
 AsyncSocketSender.send.error=Unable to asynchronously send session with 
id=[{0}] - message will be ignored.
 AsyncSocketSender.queue.empty=Queue in sender [{0}:{1,number,integer}] 
returned null element!
+AsyncSocketSender.send.recover=Recover queued message id=[{0}] after failure and send again ( current counter={1,number,integer}, max counter={2,number,integer}, timeout={3,number,long}) cluster.mbean.register.already=MBean {0} already registered!
 FastAsyncSocketSender.setThreadPriority=[{0}:{1,number,integer}] set priority 
to {2}
 FastAsyncSocketSender.min.exception=[{0}:{1,number,integer}] new priority {2} 
< MIN_PRIORITY

Modified: 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/mbeans-descriptors.xml
URL: 
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/mbeans-descriptors.xml?view=diff&rev=465293&r1=465292&r2=465293
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/mbeans-descriptors.xml
 (original)
+++ 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/mbeans-descriptors.xml
 Wed Oct 18 09:37:42 2006
@@ -620,6 +620,12 @@
           description="after send failure make a resend"
                             is="true"
                  type="boolean" />
+    <attribute   name="recoverTimeout"
+          description="recover Timeout after push message failure (default 5000 
msec)"
+                 type="long" />
+    <attribute   name="recoverCounter"
+          description="number of recover tries (default 5)"
+                 type="int" />
     <attribute   name="connected"
                  is="true"
           description="socket connected"

Modified: tomcat/container/tc5.5.x/webapps/docs/changelog.xml
URL: 
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/webapps/docs/changelog.xml?view=diff&rev=465293&r1=465292&r2=465293
==============================================================================
--- tomcat/container/tc5.5.x/webapps/docs/changelog.xml (original)
+++ tomcat/container/tc5.5.x/webapps/docs/changelog.xml Wed Oct 18 09:37:42 2006
@@ -84,6 +84,14 @@
       </fix>
     </changelog>
</subsection> + <subsection name="Cluster">
+    <changelog>
+      <add>
+        Add better recovery at FastAsyncQueueSender. Made the startegy more 
robust for temporary connection problems (pero)
+      </add>
+    </changelog>
+ </subsection> +
 </section>


---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]




---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to