Author: gatfora
Date: Wed Dec 20 07:20:37 2006
New Revision: 489111

URL: http://svn.apache.org/viewvc?view=rev&rev=489111
Log:
Add an internalSequenceId to the SequenceAck message

Modified:
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
    
webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/workers/ForceInboundDispatchTest.java

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java?view=diff&rev=489111&r1=489110&r2=489111
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
 Wed Dec 20 07:20:37 2006
@@ -280,21 +280,10 @@
                        log.debug("Enter: 
SandeshaGlobalInHandler::processDroppedMessage");
 
                if (rmMsgContext.getMessageType() == 
Sandesha2Constants.MessageTypes.APPLICATION) {
-                       Sequence sequence = (Sequence) 
rmMsgContext.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
-                       String sequenceId = null;
-
-                       if (sequence != null) {
-                               sequenceId = 
sequence.getIdentifier().getIdentifier();
-                       }
-
-                       SequencePropertyBeanMgr seqPropMgr = 
storageManager.getSequencePropertyBeanMgr();
-                       SequencePropertyBean receivedMsgsBean = 
seqPropMgr.retrieve(sequenceId,
-                                       
Sandesha2Constants.SequenceProperties.SERVER_COMPLETED_MESSAGES);
-                       String receivedMsgStr = receivedMsgsBean.getValue();
 
                        // Even though the duplicate message is dropped, hv to 
send the ack
                        // if needed.
-                       SequenceProcessor.sendAckIfNeeded(rmMsgContext, 
receivedMsgStr, storageManager);
+                       SequenceProcessor.sendAckIfNeeded(rmMsgContext, 
storageManager);
 
                }
                if (log.isDebugEnabled())

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?view=diff&rev=489111&r1=489110&r2=489111
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
 Wed Dec 20 07:20:37 2006
@@ -263,7 +263,7 @@
                }
 
                // Sending acknowledgements
-               sendAckIfNeeded(rmMsgCtx, messagesStr, storageManager);
+               sendAckIfNeeded(rmMsgCtx, storageManager);
 
                if (log.isDebugEnabled())
                        log.debug("Exit: 
SequenceProcessor::processReliableMessage " + msgCtxPaused);
@@ -284,7 +284,7 @@
                return false;
        }
 
-       public static void sendAckIfNeeded(RMMsgContext rmMsgCtx, String 
messagesStr, StorageManager storageManager)
+       public static void sendAckIfNeeded(RMMsgContext rmMsgCtx, 
StorageManager storageManager)
                        throws AxisFault {
 
                if (log.isDebugEnabled())
@@ -351,6 +351,7 @@
                        ackBean.setMessageID(ackMsgCtx.getMessageID());
                        ackBean.setReSend(false);
                        ackBean.setSequenceID(sequencePropertyKey);
+                       ackBean.setInternalSequenceID(sequencePropertyKey);
                        EndpointReference to = ackMsgCtx.getTo();
                        if (to!=null)
                                ackBean.setToAddress(to.getAddress());

Modified: 
webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/workers/ForceInboundDispatchTest.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/workers/ForceInboundDispatchTest.java?view=diff&rev=489111&r1=489110&r2=489111
==============================================================================
--- 
webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/workers/ForceInboundDispatchTest.java
 (original)
+++ 
webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/workers/ForceInboundDispatchTest.java
 Wed Dec 20 07:20:37 2006
@@ -10,11 +10,14 @@
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.ConfigurationContextFactory;
 import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.SandeshaTestCase;
 import org.apache.sandesha2.client.SandeshaClient;
 import org.apache.sandesha2.client.SandeshaClientConstants;
+import org.apache.sandesha2.client.SequenceReport;
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
 import org.apache.sandesha2.storage.beans.RMDBean;
 import org.apache.sandesha2.storage.beans.SequencePropertyBean;
 import org.apache.sandesha2.util.RangeString;
@@ -112,7 +115,7 @@
 
        }
        
-       public void testForceInvokeWithDiscardGaps () throws 
AxisFault,InterruptedException  {
+       public void testForceInvokeWithDiscardGaps () throws AxisFault  {
                
                String to = "http://127.0.0.1:"; + serverPort + 
"/axis2/services/RMSampleService";
                
@@ -139,11 +142,12 @@
                        
clientOptions.setProperty(SandeshaClientConstants.MESSAGE_NUMBER,new Long(3));
                        serviceClient.fireAndForget(getPingOMBlock("ping3"));
        
-                       Thread.sleep(5000);
+                       String internalSequenceId = 
SandeshaUtil.getInternalSequenceID(to, sequenceKey);
+                       waitForMessageToBeAcked(serviceClient, 
internalSequenceId);
                        
                        StorageManager mgr = 
SandeshaUtil.getInMemoryStorageManager(configContext);
                        Transaction t = mgr.getTransaction();
-                       String inboundSequenceID = 
SandeshaUtil.getSequenceIDFromInternalSequenceID(SandeshaUtil.getInternalSequenceID(to,
 sequenceKey),
+                       String inboundSequenceID = 
SandeshaUtil.getSequenceIDFromInternalSequenceID(internalSequenceId,
                                        mgr);
                        t.commit();
                        
@@ -164,4 +168,51 @@
 
        }
        
+  /**
+   * Waits for the maximum of "waittime" for a message to be acked, before 
returning control to the application.
+   * @throws SandeshaException 
+   */
+  private void waitForMessageToBeAcked(ServiceClient serviceClient, String 
internalSequenceId) throws SandeshaException
+  {
+    // Get the highest out message number
+    ConfigurationContext context = 
serviceClient.getServiceContext().getConfigurationContext();
+    StorageManager storageManager = 
SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());
+    
+    // Get the sequence property bean manager
+    SequencePropertyBeanMgr beanMgr = 
storageManager.getSequencePropertyBeanMgr();
+    
+    // Get a transaction for the property finding
+    Transaction transaction = storageManager.getTransaction();
+    
+    // Get the highest out message property
+    SequencePropertyBean bean = beanMgr.retrieve(internalSequenceId, 
Sandesha2Constants.SequenceProperties.HIGHEST_OUT_MSG_NUMBER);
+    
+    transaction.commit();
+    
+    Long highestOutMsgKey = Long.valueOf(bean.getValue());
+    
+    long timeNow = System.currentTimeMillis();
+    long timeToComplete = timeNow + waitTime;
+    boolean complete = false;    
+    
+    while (!complete && timeNow < timeToComplete)
+    {
+      timeNow = System.currentTimeMillis();
+
+      try
+      {                              
+        SequenceReport sequenceReport = 
SandeshaClient.getOutgoingSequenceReport(serviceClient);
+        
+        if (sequenceReport.getCompletedMessages().contains(highestOutMsgKey))
+          complete = true;
+        else
+          Thread.sleep(tickTime);
+  
+      }
+      catch (Exception e)
+      {
+        // Ignore
+      }
+    }
+  }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to