Modified: webservices/sandesha/trunk/java/modules/persistence/src/main/java/org/apache/sandesha2/storage/jdbc/PersistentStorageManager.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/persistence/src/main/java/org/apache/sandesha2/storage/jdbc/PersistentStorageManager.java?rev=708805&r1=708804&r2=708805&view=diff ============================================================================== --- webservices/sandesha/trunk/java/modules/persistence/src/main/java/org/apache/sandesha2/storage/jdbc/PersistentStorageManager.java (original) +++ webservices/sandesha/trunk/java/modules/persistence/src/main/java/org/apache/sandesha2/storage/jdbc/PersistentStorageManager.java Tue Oct 28 23:41:11 2008 @@ -47,39 +47,39 @@ /** * A Storage Manager implementation for managing Sandesha2 beans. - * + * <p/> * Needs this parameter in module.xml or axis2.xml : - * - * db.driver JDBC Driver class name - * db.connectionstring JDBC connection string - * db.user Data Base user name - * db.password Data Base user password - * - * Transactions are supposed to be attached to a thread (see inMemoryStorageManager) - * hence the ThreadLocal threadTransaction variable (instead of the transactions HashMap - * used by inMemoryStorageManager). - * - * MessageContexts are stored in a HashMap, as in inMemoryStorageManager, AND in DataBase - * as backup in case of failure. + * <p/> + * db.driver JDBC Driver class name + * db.connectionstring JDBC connection string + * db.user Data Base user name + * db.password Data Base user password + * <p/> + * Transactions are supposed to be attached to a thread (see inMemoryStorageManager) + * hence the ThreadLocal threadTransaction variable (instead of the transactions HashMap + * used by inMemoryStorageManager). + * <p/> + * MessageContexts are stored in a HashMap, as in inMemoryStorageManager, AND in DataBase + * as backup in case of failure. */ public class PersistentStorageManager extends StorageManager { - + private Connection DbConnection = null; private String DbConnectionString = null; private String DbDriver = null; private String DbUser = null; private String DbPassword = null; - private PersistentRMSBeanMgr pRMSBeanMgr = null; - private PersistentRMDBeanMgr pRMDBeanMgr = null; - private PersistentSenderBeanMgr pSenderBeanMgr = null; - private PersistentInvokerBeanMgr pInvokerBeanMgr = null; - private Sender sender = null; - private Invoker invoker = null; - private PollingManager pollingManager = null; - private boolean useSerialization = false; - private HashMap<String,MessageContext> storageMap = null; - private static ThreadLocal threadTransaction = null; + private PersistentRMSBeanMgr pRMSBeanMgr = null; + private PersistentRMDBeanMgr pRMDBeanMgr = null; + private PersistentSenderBeanMgr pSenderBeanMgr = null; + private PersistentInvokerBeanMgr pInvokerBeanMgr = null; + private Sender sender = null; + private Invoker invoker = null; + private PollingManager pollingManager = null; + private boolean useSerialization = false; + private HashMap<String, MessageContext> storageMap = null; + private static ThreadLocal threadTransaction = null; private static final Log log = LogFactory.getLog(PersistentStorageManager.class); public SandeshaThread getInvoker() { @@ -94,76 +94,72 @@ return sender; } - + public PersistentStorageManager(ConfigurationContext context) - throws SandeshaException - { - super (context); + throws SandeshaException { + super(context); log.info("create PersistentStorageManager"); - storageMap = new HashMap (); + storageMap = new HashMap(); threadTransaction = new ThreadLocal(); - pRMSBeanMgr = new PersistentRMSBeanMgr (this); - pRMDBeanMgr = new PersistentRMDBeanMgr (this); - pSenderBeanMgr = new PersistentSenderBeanMgr (this); - pInvokerBeanMgr = new PersistentInvokerBeanMgr (this); + pRMSBeanMgr = new PersistentRMSBeanMgr(this); + pRMDBeanMgr = new PersistentRMDBeanMgr(this); + pSenderBeanMgr = new PersistentSenderBeanMgr(this); + pInvokerBeanMgr = new PersistentInvokerBeanMgr(this); sender = new Sender(); // Note that while inOrder is a global property we can decide if we need the // invoker thread at this point. If we change this to be a sequence-level // property then we'll need to revisit this. SandeshaPolicyBean policy = SandeshaUtil.getPropertyBean(context.getAxisConfiguration()); useSerialization = policy.isUseMessageSerialization(); - if(policy.isInOrder()) invoker = new Invoker(); - if(policy.isEnableMakeConnection()) pollingManager = new PollingManager(); - ModuleConfiguration mc = context.getAxisConfiguration().getModuleConfig("sandesha2"); - Parameter param = mc.getParameter("db.connectionstring"); - if ( param != null ) { - DbConnectionString = (String)param.getValue(); - log.debug(param.getName() + "=" + DbConnectionString); + if (policy.isInOrder()) invoker = new Invoker(); + if (policy.isEnableMakeConnection()) pollingManager = new PollingManager(); + ModuleConfiguration mc = context.getAxisConfiguration().getModuleConfig("sandesha2"); + Parameter param = mc.getParameter("db.connectionstring"); + if (param != null) { + DbConnectionString = (String) param.getValue(); + log.debug(param.getName() + "=" + DbConnectionString); } param = mc.getParameter("db.driver"); - if ( param != null ) { - DbDriver = (String)param.getValue(); - log.debug(param.getName() + "=" + DbDriver); + if (param != null) { + DbDriver = (String) param.getValue(); + log.debug(param.getName() + "=" + DbDriver); } param = mc.getParameter("db.user"); - if ( param != null ) { - DbUser = (String)param.getValue(); - log.debug(param.getName() + "=" + DbUser); + if (param != null) { + DbUser = (String) param.getValue(); + log.debug(param.getName() + "=" + DbUser); } param = mc.getParameter("db.password"); - if ( param != null ) { - DbPassword = (String)param.getValue(); - log.debug(param.getName() + "=" + DbPassword); + if (param != null) { + DbPassword = (String) param.getValue(); + log.debug(param.getName() + "=" + DbPassword); } } - public void shutdown() - { - if ( DbConnection != null ) { + + public void shutdown() { + if (DbConnection != null) { try { DbConnection.close(); DbConnection = null; - } catch (Exception ex) {} + } catch (Exception ex) { + } } super.shutdown(); } - public InvokerBeanMgr getInvokerBeanMgr() - { + public InvokerBeanMgr getInvokerBeanMgr() { return pInvokerBeanMgr; } - public RMDBeanMgr getRMDBeanMgr() - { + public RMDBeanMgr getRMDBeanMgr() { return pRMDBeanMgr; } - public RMSBeanMgr getRMSBeanMgr() - { + public RMSBeanMgr getRMSBeanMgr() { return pRMSBeanMgr; } - public SenderBeanMgr getSenderBeanMgr() - { + public SenderBeanMgr getSenderBeanMgr() { return pSenderBeanMgr; } @@ -171,15 +167,13 @@ return useSerialization; } - public boolean hasUserTransaction(MessageContext msg) - { + public boolean hasUserTransaction(MessageContext msg) { // Answer to : Is there a user transaction in play ? // but what is a 'user transaction' ? return false; } - - public Transaction getTransaction() - { + + public Transaction getTransaction() { Transaction transaction = (Transaction) threadTransaction.get(); if (transaction == null) { transaction = new JDBCTransaction(this); @@ -188,139 +182,131 @@ // We don't want to overwrite or return an existing transaction, as someone // else should decide if we commit it or not. If we get here then we probably // have a bug. - if(log.isDebugEnabled()) log.debug("Possible re-used transaction: "); + if (log.isDebugEnabled()) log.debug("Possible re-used transaction: "); transaction = null; } return transaction; } - - public void removeTransaction() - { + + public void removeTransaction() { threadTransaction.set(null); } /** - * Returns the connection attached to the current transaction if exists - * or the "common" connection. - * + * Returns the connection attached to the current transaction if exists + * or the "common" connection. + * * @return Data Base Connection */ - public Connection getDbConnection() - { + public Connection getDbConnection() { JDBCTransaction transaction = (JDBCTransaction) threadTransaction.get(); - if ( transaction == null )return DbConnection; + if (transaction == null) return DbConnection; return transaction.getDbConnection(); } - - public void initStorage (AxisModule moduleDesc) - throws SandeshaStorageException - { + + public void initStorage(AxisModule moduleDesc) + throws SandeshaStorageException { log.info("init PersistentStorageManager"); - if ( DbConnectionString == null || DbDriver == null ) - throw new SandeshaStorageException ("Can't proceed. Needed properties are not set."); + if (DbConnectionString == null || DbDriver == null) + throw new SandeshaStorageException("Can't proceed. Needed properties are not set."); - DbConnection = dbConnect(); + DbConnection = dbConnect(); } - - public Connection dbConnect () - throws SandeshaStorageException - { - try { - Class.forName(DbDriver); - return DriverManager.getConnection(DbConnectionString,DbUser,DbPassword); - } catch (Exception ex) { - log.error("Unable to create DB connection ", ex); - throw new SandeshaStorageException(ex); - } + + public Connection dbConnect() + throws SandeshaStorageException { + try { + Class.forName(DbDriver); + return DriverManager.getConnection(DbConnectionString, DbUser, DbPassword); + } catch (Exception ex) { + log.error("Unable to create DB connection ", ex); + throw new SandeshaStorageException(ex); + } } - public MessageContext retrieveMessageContext(String key,ConfigurationContext configContext) - throws SandeshaStorageException - { + public MessageContext retrieveMessageContext(String key, ConfigurationContext configContext) + throws SandeshaStorageException { log.debug("Enter retrieveMessageContext for key " + key); - /**/ + /**/ if (storageMap.containsKey(key)) { log.debug("retrieveMessageContext get from cache"); return (MessageContext) storageMap.get(key); } - /**/ + /**/ try { - Statement stmt = getDbConnection().createStatement(); - /**/ - ResultSet rs = stmt.executeQuery("select * from wsrm_msgctx where ctx_key='" + + Statement stmt = getDbConnection().createStatement(); + /**/ + ResultSet rs = stmt.executeQuery("select * from wsrm_msgctx where ctx_key='" + key + "'"); - rs.next(); - MessageContext msgCtx = new MessageContext(); - msgCtx.readExternal(new ObjectInputStream(rs.getBinaryStream("ctx"))); - msgCtx.activate(configContext); - msgCtx.setProperty(Sandesha2Constants.POST_FAILURE_MESSAGE,Sandesha2Constants.VALUE_TRUE); - rs.close(); - stmt.close(); - log.debug("RetrieveMessageContext get from DB"); - return msgCtx; - } catch ( Exception ex) { + rs.next(); + MessageContext msgCtx = new MessageContext(); + msgCtx.readExternal(new ObjectInputStream(rs.getBinaryStream("ctx"))); + msgCtx.activate(configContext); + msgCtx.setProperty(Sandesha2Constants.POST_FAILURE_MESSAGE, Sandesha2Constants.VALUE_TRUE); + rs.close(); + stmt.close(); + log.debug("RetrieveMessageContext get from DB"); + return msgCtx; + } catch (Exception ex) { log.error("RetrieveMessageContext exception " + ex); throw new SandeshaStorageException(ex); } } - synchronized public void storeMessageContext(String key,MessageContext msgContext) - throws SandeshaStorageException - { - if ( log.isDebugEnabled() ) log.debug("Enter storeMessageContext for key " + key + " context " + msgContext); - storageMap.put(key,msgContext); + synchronized public void storeMessageContext(String key, MessageContext msgContext) + throws SandeshaStorageException { + if (log.isDebugEnabled()) log.debug("Enter storeMessageContext for key " + key + " context " + msgContext); + storageMap.put(key, msgContext); try { - PreparedStatement pstmt = getDbConnection().prepareStatement("insert into wsrm_msgctx(ctx_key,ctx)values(?,?)"); - pstmt.setString(1,key); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(baos); - msgContext.writeExternal(oos); - oos.close(); - ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); - pstmt.setBinaryStream(2, bais, bais.available()); - pstmt.execute(); - pstmt.close(); - } catch(Exception ex) { + PreparedStatement pstmt = getDbConnection().prepareStatement("insert into wsrm_msgctx(ctx_key,ctx)values(?,?)"); + pstmt.setString(1, key); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + msgContext.writeExternal(oos); + oos.close(); + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + pstmt.setBinaryStream(2, bais, bais.available()); + pstmt.execute(); + pstmt.close(); + } catch (Exception ex) { throw new SandeshaStorageException(ex); } } - synchronized public void updateMessageContext(String key,MessageContext msgContext) - throws SandeshaStorageException - { - if ( log.isDebugEnabled() ) log.debug("updateMessageContext key : " + key); - storageMap.put(key,msgContext); + synchronized public void updateMessageContext(String key, MessageContext msgContext) + throws SandeshaStorageException { + if (log.isDebugEnabled()) log.debug("updateMessageContext key : " + key); + storageMap.put(key, msgContext); PreparedStatement pstmt = null; try { - pstmt = getDbConnection().prepareStatement("update wsrm_msgctx set ctx=?" + - "where ctx_key='" + key + "'"); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(baos); - msgContext.writeExternal(oos); - oos.close(); - ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); - pstmt.setBinaryStream(1, bais, bais.available()); - pstmt.executeQuery(); - pstmt.close(); - } catch(Exception ex) { + pstmt = getDbConnection().prepareStatement("update wsrm_msgctx set ctx=?" + + "where ctx_key='" + key + "'"); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + msgContext.writeExternal(oos); + oos.close(); + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + pstmt.setBinaryStream(1, bais, bais.available()); + pstmt.executeQuery(); + pstmt.close(); + } catch (Exception ex) { throw new SandeshaStorageException("Exception in updateMessageContext", ex); } } - + public void removeMessageContext(String key) - throws SandeshaStorageException - { - if ( log.isDebugEnabled() ) log.debug("removeMessageContext key : " + key); + throws SandeshaStorageException { + if (log.isDebugEnabled()) log.debug("removeMessageContext key : " + key); try { - Statement stmt = getDbConnection().createStatement(); - MessageContext messageInCache = (MessageContext) storageMap.get(key); - if (messageInCache!=null) storageMap.remove(key); - stmt.executeUpdate("delete from wsrm_msgctx where ctx_key='" + key + "'"); - stmt.close(); - } catch (Exception ex ) { + Statement stmt = getDbConnection().createStatement(); + MessageContext messageInCache = (MessageContext) storageMap.get(key); + if (messageInCache != null) storageMap.remove(key); + stmt.executeUpdate("delete from wsrm_msgctx where ctx_key='" + key + "'"); + stmt.close(); + } catch (Exception ex) { throw new SandeshaStorageException("Exception in removeMessageContext", ex); } } - + }
Modified: webservices/sandesha/trunk/java/modules/persistence/src/resources/module_config_axis2.xml URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/persistence/src/resources/module_config_axis2.xml?rev=708805&r1=708804&r2=708805&view=diff ============================================================================== --- webservices/sandesha/trunk/java/modules/persistence/src/resources/module_config_axis2.xml (original) +++ webservices/sandesha/trunk/java/modules/persistence/src/resources/module_config_axis2.xml Tue Oct 28 23:41:11 2008 @@ -1,15 +1,15 @@ <!-- Add something like this in axis2.xml for the right DB --> - <moduleConfig name="sandesha2"> - <!-- MySQL - <parameter name="db.connectionstring">jdbc:mysql://localhost/wsrm</parameter> - <parameter name="db.driver">com.mysql.jdbc.Driver</parameter> - <parameter name="db.user"></parameter> - <parameter name="db.password"></parameter> - --> - <!-- Derby --> - <parameter name="db.connectionstring">jdbc:derby:derby/wsrm</parameter> - <parameter name="db.driver">org.apache.derby.jdbc.EmbeddedDriver</parameter> - <parameter name="db.user"></parameter> - <parameter name="db.password"></parameter> - <!-- --> - </moduleConfig> +<moduleConfig name="sandesha2"> + <!-- MySQL + <parameter name="db.connectionstring">jdbc:mysql://localhost/wsrm</parameter> + <parameter name="db.driver">com.mysql.jdbc.Driver</parameter> + <parameter name="db.user"></parameter> + <parameter name="db.password"></parameter> + --> + <!-- Derby --> + <parameter name="db.connectionstring">jdbc:derby:derby/wsrm</parameter> + <parameter name="db.driver">org.apache.derby.jdbc.EmbeddedDriver</parameter> + <parameter name="db.user"></parameter> + <parameter name="db.password"></parameter> + <!-- --> +</moduleConfig> Modified: webservices/sandesha/trunk/java/modules/persistence/src/test/java/org/apache/sandesha2/RMScenariosTest.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/persistence/src/test/java/org/apache/sandesha2/RMScenariosTest.java?rev=708805&r1=708804&r2=708805&view=diff ============================================================================== --- webservices/sandesha/trunk/java/modules/persistence/src/test/java/org/apache/sandesha2/RMScenariosTest.java (original) +++ webservices/sandesha/trunk/java/modules/persistence/src/test/java/org/apache/sandesha2/RMScenariosTest.java Tue Oct 28 23:41:11 2008 @@ -39,46 +39,46 @@ public class RMScenariosTest extends SandeshaTestCase { - private boolean serverStarted = false; + private boolean serverStarted = false; protected ConfigurationContext configContext = null; protected String to = "http://127.0.0.1:" + serverPort + "/axis2/services/RMSampleService"; - + protected String repoPath = "target" + File.separator + "repos" + File.separator + "persistence-server"; protected String axis2_xml = "target" + File.separator + "repos" + File.separator + "persistence-server" + File.separator + "server_axis2.xml"; protected String repoPathClient = "target" + File.separator + "repos" + File.separator + "persistence-client"; protected String axis2_xmlClient = "target" + File.separator + "repos" + File.separator + "persistence-client" + File.separator + "client_axis2.xml"; - - public RMScenariosTest () { - super ("RMScenariosTest"); + + public RMScenariosTest() { + super("RMScenariosTest"); } - - public RMScenariosTest (String name) { + + public RMScenariosTest(String name) { super(name); } - public void setUp () throws Exception { + public void setUp() throws Exception { super.setUp(); if (!serverStarted) { startServer(repoPath, axis2_xml); - configContext = ConfigurationContextFactory.createConfigurationContextFromFileSystem(repoPathClient,axis2_xmlClient); + configContext = ConfigurationContextFactory.createConfigurationContextFromFileSystem(repoPathClient, axis2_xmlClient); } serverStarted = true; } - + /** * Override the teardown processing */ - public void tearDown () throws Exception { + public void tearDown() throws Exception { super.tearDown(); } - public void testPing() throws Exception { + public void testPing() throws Exception { // Run a ping test with sync acks runPing(false, false); - + // Run a ping test with async acks runPing(true, true); } @@ -86,25 +86,25 @@ public void testAsyncEcho() throws Exception { // Test async echo with sync acks Options clientOptions = new Options(); - runEcho(clientOptions, true, false, false,true,false); - + runEcho(clientOptions, true, false, false, true, false); + // Test async echo with async acks clientOptions = new Options(); - runEcho(clientOptions, true, true, false,true,false); - + runEcho(clientOptions, true, true, false, true, false); + // Test async echo with async acks and offer clientOptions = new Options(); - clientOptions.setProperty(SandeshaClientConstants.OFFERED_SEQUENCE_ID,SandeshaUtil.getUUID()); - runEcho(clientOptions, true, true, false,true,true); + clientOptions.setProperty(SandeshaClientConstants.OFFERED_SEQUENCE_ID, SandeshaUtil.getUUID()); + runEcho(clientOptions, true, true, false, true, true); } - + public void testSyncEchoWithOffer() throws Exception { // Test sync echo with an offer, and the 1.1 spec Options clientOptions = new Options(); - clientOptions.setProperty(SandeshaClientConstants.OFFERED_SEQUENCE_ID,SandeshaUtil.getUUID()); - clientOptions.setProperty(SandeshaClientConstants.RM_SPEC_VERSION,Sandesha2Constants.SPEC_VERSIONS.v1_1); - runEcho(clientOptions, false, false, true,true,false); - + clientOptions.setProperty(SandeshaClientConstants.OFFERED_SEQUENCE_ID, SandeshaUtil.getUUID()); + clientOptions.setProperty(SandeshaClientConstants.RM_SPEC_VERSION, Sandesha2Constants.SPEC_VERSIONS.v1_1); + runEcho(clientOptions, false, false, true, true, false); + // // Test sync echo with an offer, and the 1.0 spec. The offer is not automatic as this // // is a client that hasn't been built from WSDL. If the user's operations had been // // modelled properly then the offer would happen automatically. @@ -112,40 +112,40 @@ // clientOptions.setProperty(SandeshaClientConstants.OFFERED_SEQUENCE_ID,SandeshaUtil.getUUID()); // clientOptions.setProperty(SandeshaClientConstants.RM_SPEC_VERSION,Sandesha2Constants.SPEC_VERSIONS.v1_0); // runEcho(clientOptions, false, false, true,false,false); - } + } - public void testSyncEcho() throws Exception { + public void testSyncEcho() throws Exception { // Test sync echo with no offer, and the 1.1 spec Options clientOptions = new Options(); - clientOptions.setProperty(SandeshaClientConstants.RM_SPEC_VERSION,Sandesha2Constants.SPEC_VERSIONS.v1_1); - runEcho(clientOptions, false, false, true,true,true); + clientOptions.setProperty(SandeshaClientConstants.RM_SPEC_VERSION, Sandesha2Constants.SPEC_VERSIONS.v1_1); + runEcho(clientOptions, false, false, true, true, true); } public void runPing(boolean asyncAcks, boolean stopListener) throws Exception { - + Options clientOptions = new Options(); - ServiceClient serviceClient = new ServiceClient (configContext,null); + ServiceClient serviceClient = new ServiceClient(configContext, null); serviceClient.setOptions(clientOptions); String sequenceKey = SandeshaUtil.getUUID(); clientOptions.setAction(pingAction); - clientOptions.setTo(new EndpointReference (to)); - clientOptions.setProperty(SandeshaClientConstants.SEQUENCE_KEY,sequenceKey); + clientOptions.setTo(new EndpointReference(to)); + clientOptions.setProperty(SandeshaClientConstants.SEQUENCE_KEY, sequenceKey); clientOptions.setProperty(SandeshaClientConstants.LAST_MESSAGE, Constants.VALUE_TRUE); - - if(asyncAcks) { + + if (asyncAcks) { clientOptions.setUseSeparateListener(true); } serviceClient.fireAndForget(getPingOMBlock("ping1")); - + long limit = System.currentTimeMillis() + waitTime; Error lastError = null; - while(System.currentTimeMillis() < limit) { + while (System.currentTimeMillis() < limit) { Thread.sleep(tickTime); // Try the assertions each tick interval, until they pass or we time out - + try { SequenceReport sequenceReport = SandeshaClient.getOutgoingSequenceReport(serviceClient); System.out.println("Checking Outbound Sequence: " + sequenceReport.getSequenceID()); @@ -155,86 +155,87 @@ lastError = null; break; - } catch(Error e) { - e.printStackTrace(); - System.out.println("Possible error:" + e); + } catch (Error e) { + e.printStackTrace(); + System.out.println("Possible error:" + e); lastError = e; } } - if(lastError != null) throw lastError; + if (lastError != null) throw lastError; if (stopListener) configContext.getListenerManager().stop(); - + serviceClient.cleanup(); } public void runEcho(Options clientOptions, boolean asyncReply, boolean asyncAcks, boolean explicitTermination, boolean checkInboundTermination, boolean stopListener) throws Exception { - + String sequenceKey = SandeshaUtil.getUUID(); - ServiceClient serviceClient = new ServiceClient (configContext,null); + ServiceClient serviceClient = new ServiceClient(configContext, null); serviceClient.setOptions(clientOptions); clientOptions.setAction(echoAction); - clientOptions.setTo(new EndpointReference (to)); - clientOptions.setProperty(SandeshaClientConstants.SEQUENCE_KEY,sequenceKey); + clientOptions.setTo(new EndpointReference(to)); + clientOptions.setProperty(SandeshaClientConstants.SEQUENCE_KEY, sequenceKey); clientOptions.setTransportInProtocol(Constants.TRANSPORT_HTTP); - if(asyncReply || asyncAcks) { + if (asyncReply || asyncAcks) { clientOptions.setUseSeparateListener(true); - - if(asyncAcks) { + + if (asyncAcks) { String acksTo = serviceClient.getMyEPR(Constants.TRANSPORT_HTTP).getAddress(); - clientOptions.setProperty(SandeshaClientConstants.AcksTo,acksTo); + clientOptions.setProperty(SandeshaClientConstants.AcksTo, acksTo); } else { String acksTo = AddressingConstants.Final.WSA_ANONYMOUS_URL; - clientOptions.setProperty(SandeshaClientConstants.AcksTo,acksTo); + clientOptions.setProperty(SandeshaClientConstants.AcksTo, acksTo); } } - - if(asyncAcks) { + + if (asyncAcks) { String acksTo = serviceClient.getMyEPR(Constants.TRANSPORT_HTTP).getAddress(); - clientOptions.setProperty(SandeshaClientConstants.AcksTo,acksTo); + clientOptions.setProperty(SandeshaClientConstants.AcksTo, acksTo); } else { String acksTo = AddressingConstants.Final.WSA_ANONYMOUS_URL; - clientOptions.setProperty(SandeshaClientConstants.AcksTo,acksTo); + clientOptions.setProperty(SandeshaClientConstants.AcksTo, acksTo); } // Establish a baseline count for inbound sequences List oldIncomingReports = SandeshaClient.getIncomingSequenceReports(configContext); - - TestCallback callback1 = new TestCallback ("Callback 1"); - serviceClient.sendReceiveNonBlocking (getEchoOMBlock("echo1",sequenceKey),callback1); - - TestCallback callback2 = new TestCallback ("Callback 2"); - serviceClient.sendReceiveNonBlocking (getEchoOMBlock("echo2",sequenceKey),callback2); - - if (!explicitTermination - && - !Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(clientOptions.getProperty(SandeshaClientConstants.RM_SPEC_VERSION))) { - + + TestCallback callback1 = new TestCallback("Callback 1"); + serviceClient.sendReceiveNonBlocking(getEchoOMBlock("echo1", sequenceKey), callback1); + + TestCallback callback2 = new TestCallback("Callback 2"); + serviceClient.sendReceiveNonBlocking(getEchoOMBlock("echo2", sequenceKey), callback2); + + if (!explicitTermination + && + !Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(clientOptions.getProperty(SandeshaClientConstants.RM_SPEC_VERSION))) + { + clientOptions.setProperty(SandeshaClientConstants.LAST_MESSAGE, Constants.VALUE_TRUE); } - - TestCallback callback3 = new TestCallback ("Callback 3"); - serviceClient.sendReceiveNonBlocking (getEchoOMBlock("echo3",sequenceKey),callback3); - + + TestCallback callback3 = new TestCallback("Callback 3"); + serviceClient.sendReceiveNonBlocking(getEchoOMBlock("echo3", sequenceKey), callback3); + if (explicitTermination) { Thread.sleep(10000); SandeshaClient.terminateSequence(serviceClient); } - + long limit = System.currentTimeMillis() + waitTime; Error lastError = null; - while(System.currentTimeMillis() < limit) { + while (System.currentTimeMillis() < limit) { Thread.sleep(tickTime); // Try the assertions each tick interval, until they pass or we time out - + try { - - //assertions for the out sequence. + + //assertions for the out sequence. SequenceReport outgoingSequenceReport = SandeshaClient.getOutgoingSequenceReport(serviceClient); System.out.println("Checking Outbound Sequence: " + outgoingSequenceReport.getSequenceID()); assertTrue("Outbound message #1", outgoingSequenceReport.getCompletedMessages().contains(new Long(1))); @@ -242,46 +243,46 @@ assertTrue("Outbound message #3", outgoingSequenceReport.getCompletedMessages().contains(new Long(3))); assertEquals("Outbound sequence status: TERMINATED", SequenceReport.SEQUENCE_STATUS_TERMINATED, outgoingSequenceReport.getSequenceStatus()); assertEquals("Outbound sequence direction: OUT", SequenceReport.SEQUENCE_DIRECTION_OUT, outgoingSequenceReport.getSequenceDirection()); - + //assertions for the inbound sequence. The one we care about is a new sequence, //so it will not exist in the oldSequences list. List incomingSequences = SandeshaClient.getIncomingSequenceReports(configContext); SequenceReport incomingSequenceReport = getNewReport(incomingSequences, oldIncomingReports); System.out.println("Checking Inbound Sequence: " + incomingSequenceReport.getSequenceID()); String offer = (String) clientOptions.getProperty(SandeshaClientConstants.OFFERED_SEQUENCE_ID); - if(offer != null) assertEquals("Inbound seq id", offer, incomingSequenceReport.getSequenceID()); - assertEquals ("Inbound message count", 3, incomingSequenceReport.getCompletedMessages().size()); + if (offer != null) assertEquals("Inbound seq id", offer, incomingSequenceReport.getSequenceID()); + assertEquals("Inbound message count", 3, incomingSequenceReport.getCompletedMessages().size()); assertTrue("Inbound message #1", incomingSequenceReport.getCompletedMessages().contains(new Long(1))); assertTrue("Inbound message #2", incomingSequenceReport.getCompletedMessages().contains(new Long(2))); assertTrue("Inbound message #3", incomingSequenceReport.getCompletedMessages().contains(new Long(3))); - + if (checkInboundTermination) assertEquals("Inbound sequence status: TERMINATED", SequenceReport.SEQUENCE_STATUS_TERMINATED, incomingSequenceReport.getSequenceStatus()); - + assertEquals("Inbound sequence direction: IN", SequenceReport.SEQUENCE_DIRECTION_IN, incomingSequenceReport.getSequenceDirection()); - + assertTrue("Callback #1", callback1.isComplete()); assertEquals("Callback #1 data", "echo1", callback1.getResult()); - + assertTrue("Callback #2", callback2.isComplete()); assertEquals("Callback #2 data", "echo1echo2", callback2.getResult()); - + assertTrue("Callback #3", callback3.isComplete()); assertEquals("Callback #3 data", "echo1echo2echo3", callback3.getResult()); - + lastError = null; break; - } catch(Error e) { - e.printStackTrace(); - System.out.println("Possible error:" + e); + } catch (Error e) { + e.printStackTrace(); + System.out.println("Possible error:" + e); lastError = e; } } - if(lastError != null) throw lastError; - + if (lastError != null) throw lastError; + if (stopListener) configContext.getListenerManager().stop(); - + serviceClient.cleanup(); } @@ -289,13 +290,13 @@ // was established by this test. Note that some of the old sequences may have timed out. private SequenceReport getNewReport(List incomingSequences, List oldIncomingReports) { HashSet sequenceIds = new HashSet(); - for(Iterator oldSequences = oldIncomingReports.iterator(); oldSequences.hasNext(); ) { + for (Iterator oldSequences = oldIncomingReports.iterator(); oldSequences.hasNext();) { SequenceReport report = (SequenceReport) oldSequences.next(); sequenceIds.add(report.getSequenceID()); } - for(Iterator currentSequences = incomingSequences.iterator(); currentSequences.hasNext(); ) { + for (Iterator currentSequences = incomingSequences.iterator(); currentSequences.hasNext();) { SequenceReport report = (SequenceReport) currentSequences.next(); - if(!sequenceIds.contains(report.getSequenceID())) { + if (!sequenceIds.contains(report.getSequenceID())) { return report; } } Modified: webservices/sandesha/trunk/java/modules/persistence/src/test/java/org/apache/sandesha2/SandeshaTestCase.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/persistence/src/test/java/org/apache/sandesha2/SandeshaTestCase.java?rev=708805&r1=708804&r2=708805&view=diff ============================================================================== --- webservices/sandesha/trunk/java/modules/persistence/src/test/java/org/apache/sandesha2/SandeshaTestCase.java (original) +++ webservices/sandesha/trunk/java/modules/persistence/src/test/java/org/apache/sandesha2/SandeshaTestCase.java Tue Oct 28 23:41:11 2008 @@ -52,16 +52,16 @@ import org.apache.commons.logging.LogFactory; public class SandeshaTestCase extends TestCase { - + String resourceDir = ""; //"test-resources"; - Properties properties = null; - final String PROPERTY_FILE_NAME = "sandesha2-test.properties"; - public final int DEFAULT_SERVER_TEST_PORT = 8060; - public ConfigurationContext serverConfigurationContext = null; - private final String RMServiceName = "RMSampleService"; + Properties properties = null; + final String PROPERTY_FILE_NAME = "sandesha2-test.properties"; + public final int DEFAULT_SERVER_TEST_PORT = 8060; + public ConfigurationContext serverConfigurationContext = null; + private final String RMServiceName = "RMSampleService"; private Log log = LogFactory.getLog(getClass()); - - private final static String applicationNamespaceName = "http://tempuri.org/"; + + private final static String applicationNamespaceName = "http://tempuri.org/"; private final static String echoString = "echoString"; private final static String ping = "ping"; private final static String Text = "Text"; @@ -75,111 +75,111 @@ protected int tickTime = 10000; // Each wait will check the test assertions each second protected String pingAction = "urn:wsrm:Ping"; protected String echoAction = "urn:wsrm:EchoString"; - - public SandeshaTestCase(String name) { - super(name); - String testRource = "target" + File.separator + "test-classes"; - resourceDir = new File(testRource).getPath(); - - String propFileStr = resourceDir + File.separator + PROPERTY_FILE_NAME; - properties = new Properties (); - - try { - FileInputStream propertyFile = new FileInputStream (new File(propFileStr)); + + public SandeshaTestCase(String name) { + super(name); + String testRource = "target" + File.separator + "test-classes"; + resourceDir = new File(testRource).getPath(); + + String propFileStr = resourceDir + File.separator + PROPERTY_FILE_NAME; + properties = new Properties(); + + try { + FileInputStream propertyFile = new FileInputStream(new File(propFileStr)); properties.load(propertyFile); } catch (FileNotFoundException e) { log.error(e); } catch (IOException e) { log.error(e); } - } - - public void setUp () throws Exception { + } + + public void setUp() throws Exception { super.setUp(); - + String serverPortStr = getTestProperty("test.server.port"); - if (serverPortStr!=null) { + if (serverPortStr != null) { try { serverPort = Integer.parseInt(serverPortStr); } catch (NumberFormatException e) { log.error(e); } } - } - + } + public ConfigurationContext startServer(String repoPath, String axis2_xml) - throws Exception { + throws Exception { ConfigurationContext configContext = - ConfigurationContextFactory.createConfigurationContextFromFileSystem(repoPath,axis2_xml); + ConfigurationContextFactory.createConfigurationContextFromFileSystem(repoPath, axis2_xml); - httpServer = new SimpleHTTPServer (configContext,serverPort); + httpServer = new SimpleHTTPServer(configContext, serverPort); httpServer.start(); Thread.sleep(300); - + return configContext; } - public void tearDown () throws Exception { - if (httpServer!=null) { + public void tearDown() throws Exception { + if (httpServer != null) { httpServer.stop(); } - + Thread.sleep(300); } protected InputStreamReader getResource(String relativePath, String resourceName) { - String resourceFile = resourceDir + relativePath + File.separator + resourceName; - try { - FileReader reader = new FileReader(resourceFile); - return reader; - } catch (FileNotFoundException e) { - throw new RuntimeException("cannot load the test-resource", e); - } - } - - protected SOAPEnvelope getSOAPEnvelope() { - return OMAbstractFactory.getSOAP11Factory().getDefaultEnvelope(); - } - - protected SOAPEnvelope getSOAPEnvelope(String relativePath, String resourceName) { - try { - XMLStreamReader reader = XMLInputFactory.newInstance().createXMLStreamReader( - getResource(relativePath, resourceName)); - OMXMLParserWrapper wrapper = OMXMLBuilderFactory.createStAXSOAPModelBuilder( - OMAbstractFactory.getSOAP11Factory(), reader); - return (SOAPEnvelope) wrapper.getDocumentElement(); - - } catch (XMLStreamException e) { - throw new RuntimeException(e); - } - } - - protected SOAPEnvelope getEmptySOAPEnvelope() { - return OMAbstractFactory.getSOAP11Factory().getDefaultEnvelope(); - } + String resourceFile = resourceDir + relativePath + File.separator + resourceName; + try { + FileReader reader = new FileReader(resourceFile); + return reader; + } catch (FileNotFoundException e) { + throw new RuntimeException("cannot load the test-resource", e); + } + } + + protected SOAPEnvelope getSOAPEnvelope() { + return OMAbstractFactory.getSOAP11Factory().getDefaultEnvelope(); + } + + protected SOAPEnvelope getSOAPEnvelope(String relativePath, String resourceName) { + try { + XMLStreamReader reader = XMLInputFactory.newInstance().createXMLStreamReader( + getResource(relativePath, resourceName)); + OMXMLParserWrapper wrapper = OMXMLBuilderFactory.createStAXSOAPModelBuilder( + OMAbstractFactory.getSOAP11Factory(), reader); + return (SOAPEnvelope) wrapper.getDocumentElement(); + + } catch (XMLStreamException e) { + throw new RuntimeException(e); + } + } + + protected SOAPEnvelope getEmptySOAPEnvelope() { + return OMAbstractFactory.getSOAP11Factory().getDefaultEnvelope(); + } - protected static OMElement getEchoOMBlock(String text, String sequenceKey) { + protected static OMElement getEchoOMBlock(String text, String sequenceKey) { OMFactory fac = OMAbstractFactory.getOMFactory(); - OMNamespace applicationNamespace = fac.createOMNamespace(applicationNamespaceName,"ns1"); + OMNamespace applicationNamespace = fac.createOMNamespace(applicationNamespaceName, "ns1"); OMElement echoStringElement = fac.createOMElement(echoString, applicationNamespace); - OMElement textElem = fac.createOMElement(Text,applicationNamespace); - OMElement sequenceElem = fac.createOMElement(Sequence,applicationNamespace); - + OMElement textElem = fac.createOMElement(Text, applicationNamespace); + OMElement sequenceElem = fac.createOMElement(Sequence, applicationNamespace); + textElem.setText(text); sequenceElem.setText(sequenceKey); echoStringElement.addChild(textElem); echoStringElement.addChild(sequenceElem); - + return echoStringElement; } - + protected OMElement getPingOMBlock(String text) { OMFactory fac = OMAbstractFactory.getOMFactory(); - OMNamespace namespace = fac.createOMNamespace(applicationNamespaceName,"ns1"); + OMNamespace namespace = fac.createOMNamespace(applicationNamespaceName, "ns1"); OMElement pingElem = fac.createOMElement(ping, namespace); OMElement textElem = fac.createOMElement(Text, namespace); - + textElem.setText(text); pingElem.addChild(textElem); @@ -189,39 +189,39 @@ protected String checkEchoOMBlock(OMElement response) { assertEquals("Response namespace", applicationNamespaceName, response.getNamespace().getNamespaceURI()); assertEquals("Response local name", echoStringResponse, response.getLocalName()); - - OMElement echoStringReturnElem = response.getFirstChildWithName(new QName (applicationNamespaceName,EchoStringReturn)); + + OMElement echoStringReturnElem = response.getFirstChildWithName(new QName(applicationNamespaceName, EchoStringReturn)); assertNotNull("Echo String Return", echoStringReturnElem); - + String resultStr = echoStringReturnElem.getText(); return resultStr; } - public String getTestProperty (String key) { - if (properties!=null) - return properties.getProperty(key); - - return null; - } - - public void overrideConfigurationContext (ConfigurationContext context,MessageReceiver messageReceiver, String operationName, boolean newOperation, int mep) throws Exception { - - - AxisService rmService = context.getAxisConfiguration().getService(RMServiceName); - - AxisOperation operation = null; - - if (newOperation) { - operation = rmService.getOperation(new QName (operationName)); - if (operation==null) - throw new Exception ("Given operation not found"); - } else { - operation = AxisOperationFactory.getAxisOperation(mep); - rmService.addOperation(operation); - } - - operation.setMessageReceiver(messageReceiver); - } + public String getTestProperty(String key) { + if (properties != null) + return properties.getProperty(key); + + return null; + } + + public void overrideConfigurationContext(ConfigurationContext context, MessageReceiver messageReceiver, String operationName, boolean newOperation, int mep) throws Exception { + + + AxisService rmService = context.getAxisConfiguration().getService(RMServiceName); + + AxisOperation operation = null; + + if (newOperation) { + operation = rmService.getOperation(new QName(operationName)); + if (operation == null) + throw new Exception("Given operation not found"); + } else { + operation = AxisOperationFactory.getAxisOperation(mep); + rmService.addOperation(operation); + } + + operation.setMessageReceiver(messageReceiver); + } protected class TestCallback implements AxisCallback { @@ -229,23 +229,23 @@ boolean completed = false; boolean errorReported = false; String resultStr; - + public boolean isComplete() { return completed; } - + public boolean isErrorReported() { return errorReported; } - public String getResult () { + public String getResult() { return resultStr; } - - public TestCallback (String name) { + + public TestCallback(String name) { this.name = name; } - + public void onComplete() { completed = true; } @@ -256,13 +256,13 @@ this.resultStr = checkEchoOMBlock(contents); System.out.println("TestCallback got text: '" + resultStr + "'"); } - + public void onFault(MessageContext result) { errorReported = true; System.out.println("TestCallback got fault: " + result.getEnvelope()); } - public void onError (Exception e) { + public void onError(Exception e) { errorReported = true; System.out.println("TestCallback got exception"); e.printStackTrace(); --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]
