Author: pero
Date: Wed Oct 18 09:37:42 2006
New Revision: 465293
URL: http://svn.apache.org/viewvc?view=rev&rev=465293
Log:
Made recovery more robust.
Modified:
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/FastAsyncSocketSender.java
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/mbeans-descriptors.xml
tomcat/container/tc5.5.x/webapps/docs/changelog.xml
Modified:
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/FastAsyncSocketSender.java
URL:
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/FastAsyncSocketSender.java?view=diff&rev=465293&r1=465292&r2=465293
==============================================================================
---
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/FastAsyncSocketSender.java
(original)
+++
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/FastAsyncSocketSender.java
Wed Oct 18 09:37:42 2006
@@ -54,7 +54,7 @@
/**
* The descriptive information about this implementation.
*/
- private static final String info = "FastAsyncSocketSender/3.0";
+ private static final String info = "FastAsyncSocketSender/3.1";
// ----------------------------------------------------- Instance Variables
@@ -69,6 +69,16 @@
private FastQueueThread queueThread = null;
/**
+ * recover timeout ( default 5 secs)
+ */
+ private long recoverTimeout = 5000;
+
+ /**
+ * number of recover tries
+ */
+ private int recoverCounter = 5;
+
+ /**
* Count number of queue message
*/
private long inQueueCounter = 0;
@@ -229,6 +239,40 @@
}
/**
+ * get current push message recover timeout
+ * @return current push message recover timeout
+ */
+ public long getRecoverTimeout() {
+
+ return recoverTimeout;
+ }
+
+ /**
+ * Set recover timeout (default 5000 msec)
+ * @param timeout
+ */
+ public void setRecoverTimeout(long timeout) {
+ recoverTimeout = timeout;
+ }
+
+ /**
+ * get current push message recover counter
+ * @return current push message recover counter
+ */
+ public int getRecoverCounter() {
+
+ return recoverCounter;
+ }
+
+ /**
+ * Set recover couner (default 5 )
+ * @param counter
+ */
+ public void setRecoverCounter(int counter) {
+ recoverCounter = counter;
+ }
+
+ /**
* change active the queue Thread priority
* @param threadPriority value must be between MIN and MAX Thread Priority
* @exception IllegalArgumentException
@@ -465,25 +509,62 @@
}
/**
- * @param entry
+ * Push all messages from queue to other nodes. Is revovery configured
+ * make a resends with some waits.
+ * @param entry list of messages
*/
protected void pushQueuedMessages(LinkObject entry) {
do {
int messagesize = 0;
+ ClusterData data = null ;
try {
- ClusterData data = (ClusterData) entry.data();
+ data = (ClusterData) entry.data();
messagesize = data.getMessage().length;
sender.pushMessage(data);
} catch (Exception x) {
- log.warn(sm.getString(
- "AsyncSocketSender.send.error", entry
+ long rTimeout = sender.getRecoverTimeout() ;
+ int rCounter = sender.getRecoverCounter() ;
+ if(data != null &&
+ rTimeout > 0 &&
+ rCounter > 0) {
+ // wait that network get stabler
+ int counter = 1;
+ boolean success = false ;
+ do {
+ try {
+ Thread.sleep(rTimeout*counter);
+ } catch (Exception sleep) {
+ }
+ try {
+ if(log.isDebugEnabled()) {
+
log.debug(sm.getString("AsyncSocketSender.send.recover",
+ entry.getKey(),
+ new Integer(counter),
+ new Integer(rCounter), new Long(rTimeout))) ;
+ }
+ sender.pushMessage(data);
+ success = true;
+ } catch (Exception xx) {
+ counter++;
+ }
+ } while (keepRunning && !success && counter <=
rCounter);
+
+ if(!success) {
+ log.warn(sm.getString(
+ "AsyncSocketSender.send.error", entry
.getKey()), x);
- } finally {
+ }
+ } else {
+ log.warn(sm.getString(
+ "AsyncSocketSender.send.error", entry
+ .getKey()), x);
+ }
+ } finally {
outQueueCounter++;
decQueuedNrOfBytes(messagesize);
}
entry = entry.next();
- } while (entry != null);
+ } while (keepRunning && entry != null);
}
}
Modified:
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties
URL:
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties?view=diff&rev=465293&r1=465292&r2=465293
==============================================================================
---
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties
(original)
+++
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties
Wed Oct 18 09:37:42 2006
@@ -2,6 +2,7 @@
AsyncSocketSender.queue.message=Queue message to [{0}:{1,number,integer}]
id=[{2}] size={3}
AsyncSocketSender.send.error=Unable to asynchronously send session with
id=[{0}] - message will be ignored.
AsyncSocketSender.queue.empty=Queue in sender [{0}:{1,number,integer}]
returned null element!
+AsyncSocketSender.send.recover=Recover queued message id=[{0}] after failure and send again ( current counter={1,number,integer}, max counter={2,number,integer}, timeout={3,number,long})
cluster.mbean.register.already=MBean {0} already registered!
FastAsyncSocketSender.setThreadPriority=[{0}:{1,number,integer}] set priority
to {2}
FastAsyncSocketSender.min.exception=[{0}:{1,number,integer}] new priority {2}
< MIN_PRIORITY
Modified:
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/mbeans-descriptors.xml
URL:
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/mbeans-descriptors.xml?view=diff&rev=465293&r1=465292&r2=465293
==============================================================================
---
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/mbeans-descriptors.xml
(original)
+++
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/mbeans-descriptors.xml
Wed Oct 18 09:37:42 2006
@@ -620,6 +620,12 @@
description="after send failure make a resend"
is="true"
type="boolean" />
+ <attribute name="recoverTimeout"
+ description="recover Timeout after push message failure (default 5000
msec)"
+ type="long" />
+ <attribute name="recoverCounter"
+ description="number of recover tries (default 5)"
+ type="int" />
<attribute name="connected"
is="true"
description="socket connected"
Modified: tomcat/container/tc5.5.x/webapps/docs/changelog.xml
URL:
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/webapps/docs/changelog.xml?view=diff&rev=465293&r1=465292&r2=465293
==============================================================================
--- tomcat/container/tc5.5.x/webapps/docs/changelog.xml (original)
+++ tomcat/container/tc5.5.x/webapps/docs/changelog.xml Wed Oct 18 09:37:42 2006
@@ -84,6 +84,14 @@
</fix>
</changelog>
</subsection>
+ <subsection name="Cluster">
+ <changelog>
+ <add>
+ Add better recovery at FastAsyncQueueSender. Made the startegy more
robust for temporary connection problems (pero)
+ </add>
+ </changelog>
+ </subsection>
+
</section>
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]