On Tue, Oct 28, 2008 at 11:26 AM, Amila Suriarachchi <
[EMAIL PROTECTED]> wrote:
>
>
> On Fri, Oct 24, 2008 at 7:54 PM, Thomas McKiernan <[EMAIL PROTECTED]>wrote:
>
>> I think Andrew was saying that there is already an implicit order for the
>> beans.
>
>
> yes this is correct. most of the transactions follows a same order except
> some transactions.
> What about making an explicit order so that we can change the code whenever
> found an exception.
>
> we can have an order like RMSBean, RMDBean, MessageBean, SenderBean.
>
>
>> The point is people keep changing the code and breaking that order.
>> Therefore this is not quite a hack but an attempt to make explicit what is
>> already implied in the open src code.
>>
>> It would not work for the distributed case but, assuming each distributed
>> thread was behaving correctly i.e. obeying the correct locking order then
>> this would not be an issue.
>> And this would catch any thread disobeying that ordering.
>
>
Please see the attached patch I have added some similar thing as you have
mentioned. Here I could not locate the InvokerBean.
Then I proceed with this change. I found a lot of places where it does not
follow the given order. In order to proceed I put another set of methods
called locks which are called earlier to make the sequence correct.
In this way I found a lot of places which does not follow this order. Of
course this may not be the correct order to follow.
Based on the above observations we can possibly have following options to
address this problem.
1. Although there are dead locks sequence get finished. I think this is the
way Sandesha2 is written. if there is an error it can roll back and proceed.
But lot of people may not like it.
2. To implement a possible deadlock detection system (As suggested above )
and remove them either using some dummy locks or reformatting the code. Here
reformatting the code may not feasible all the time.
Any thoughts?
thanks,
Amila.
>
> As I understood in your method what you try to do is to have an explicit
> order of accessing the
> beans and raise an exception if one transaction make an exception.
>
> In this case also don't we have to go and change the transaction to have it
> correct order (the explicit order you have define). Otherwise it keep on
> failing.
>
> But this will certainly improve the transaction debugging.
>
> thanks,
> Amila.
>
>
>
>>
>>
>> ----------------------------------
>> Thomas McKiernan
>>
>> WebSphere Messaging Development,
>> IBM United Kingdom Limited
>>
>> Internal Phone: 248241
>> External Phone: +44 (0)1962 818241
>> Mobile: +44 (0)789 1737497
>> Email: [EMAIL PROTECTED]
>>
>> Mail Point 211, IBM, Hursley Park, Winchester, Hampshire, England, SO21
>> 2JN
>>
>>
>> Caminante, no hay camino
>> Se hace camino al andar.
>> ("Walker, there is no path; the path is made by walking.") Antonio
>> Machado
>>
>>
>>
>> From:
>> "Amila Suriarachchi" <[EMAIL PROTECTED]>
>> To:
>> Thomas McKiernan/UK/[EMAIL PROTECTED]
>> Cc:
>> Andrew K Gatford/UK/[EMAIL PROTECTED], "[email protected]"
>> <[email protected]>
>> Date:
>> 24/10/2008 14:06
>> Subject:
>> Re: Sandesha2 synchronization and dead lock handling.
>>
>>
>>
>>
>>
>> On Fri, Oct 24, 2008 at 3:55 PM, Thomas McKiernan <[EMAIL PROTECTED]>
>> wrote:
>> How about a lock manager impl independent of any particular store's impl.
>> It could be abstract if necessary.
>>
>> Basically, this has a hierarchy of classes (beans) hard coded.
>> If you use a store to access a bean then the store impl's tran calls into
>> the independent lock manager.
>>
>> I feel this is a kind hack for the problem. And also as Andrew has
>> mentioned this won't work in a
>> distributed environment.
>> For me the correct solution is to go through all the transactions and make
>> an order of which
>> transactions access the beans. But apparently this is also seems to be
>> difficult since a lot
>> of transactions has start and commits.
>> So have to think bit more.
>>
>> thanks,
>> Amila.
>>
>>
>>
>>
>> Any attempt to enlist outside of the locking hierarchy results in a hard
>> runtime error and a rollback of the tran.
>>
>> Is this too naive?
>>
>> ----------------------------------
>> Thomas McKiernan
>>
>> WebSphere Messaging Development,
>> IBM United Kingdom Limited
>>
>> Internal Phone: 248241
>> External Phone: +44 (0)1962 818241
>> Mobile: +44 (0)789 1737497
>> Email: [EMAIL PROTECTED]
>>
>> Mail Point 211, IBM, Hursley Park, Winchester, Hampshire, England, SO21
>> 2JN
>>
>>
>> Caminante, no hay camino
>> Se hace camino al andar.
>> ("Walker, there is no path; the path is made by walking.") Antonio
>> Machado
>>
>>
>>
>> From:
>> Andrew K Gatford/UK/[EMAIL PROTECTED]
>> To:
>> "Amila Suriarachchi" <[EMAIL PROTECTED]>
>> Cc:
>> "[email protected]" <[email protected]>
>> Date:
>> 24/10/2008 11:07
>> Subject:
>> Re: Sandesha2 synchronization and dead lock handling.
>>
>>
>>
>> I went through similar pain when implementing a StorageManager and
>> encountered a number of deadlocks similar to the ones that you describe.
>> What I have gradually done is eliminate these in both the InMemory store
>> and my store by changing the ordering the beans were taken in.
>>
>> In general the beans are taken in this order.
>>
>> RMSBean or RMDBean followed by
>> SenderBean or InvokerBean.
>>
>> In cases where both the RMSBean and RMDBean are locked, they tend to be
>> taken in that order - RMS followed by RMD.
>> The one thing that I do know is that it is fairly easy to introduce new
>> deadlocks by slightly altering the order that beans are read.
>>
>> The one question I have is how does the jdbc store handle multiple threads
>>
>> accessing multiple sequences, or even a single sequence, but with multiple
>>
>> threads sending multiple requests. From my experience this is where we
>> have found a lot of problems in the InMemory store and I expect to be even
>>
>> more painful with a jdbc store.
>>
>> Andrew Gatford
>> Technical Project Lead
>> Websphere ESB Foundation Technologies
>> Hursley MP211
>> IBM United Kingdom Laboratories, Hursley Park, Winchester, SO21 2JN
>> Telephone :
>> Internal (7) 245743
>> External 01962 815743
>> Internet : [EMAIL PROTECTED]
>>
>>
>>
>> From:
>> "Amila Suriarachchi" <[EMAIL PROTECTED]>
>> To:
>> "[email protected]" <[email protected]>
>> Date:
>> 24/10/2008 10:30
>> Subject:
>> Sandesha2 synchronization and dead lock handling.
>>
>>
>>
>> hi all,
>>
>> This is regarding the issue [1].
>>
>> First of all as I learned Sandesha2 uses different beans to keep the state
>>
>> of the sequence and the messages. In a dual channel mode
>> different threads can access these beans and update them concurrently. So
>> the synchronization of these beans done by using the
>> storage level transactions. Therefore Sandesha2 needs an storage which
>> supports isolated transactions.
>>
>> To synchronize these beans the transactions must be completely isolated.
>> i.e It should not allow simultaneous reads of
>> same record from different transactions. Therefore I think the problem I
>> saw on[1] because not isolating the transactions properly.
>>
>> Then I increased the transaction isolation to fix the above problem. It
>> fixed that problem but results in dead locks.
>> The reason I believe for this dead locks is that different transactions
>> try to access the data base tables in different order.
>> But unfortunately I could not fix the issue.
>>
>> Normally these types of dead locks are prevented by accessing resources in
>>
>> same order. Does Sandesha2 follows such a order or any
>> other technique?
>>
>> Or is there any other reason for this dead locks and synchronization
>> problems? Can someone
>> have a better idea of Sandesha2 Design shed some light on this?
>>
>> thanks,
>> Amila.
>>
>>
>> [1] http://issues.apache.org/jira/browse/SANDESHA2-179
>> --
>> Amila Suriarachchi
>> WSO2 Inc.
>> blog: http://amilachinthaka.blogspot.com/
>>
>>
>>
>>
>>
>>
>> Unless stated otherwise above:
>> IBM United Kingdom Limited - Registered in England and Wales with number
>> 741598.
>> Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU
>>
>>
>>
>>
>>
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: [EMAIL PROTECTED]
>> For additional commands, e-mail: [EMAIL PROTECTED]
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> Unless stated otherwise above:
>> IBM United Kingdom Limited - Registered in England and Wales with number
>> 741598.
>> Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> --
>> Amila Suriarachchi
>> WSO2 Inc.
>> blog: http://amilachinthaka.blogspot.com/
>>
>>
>>
>>
>>
>>
>>
>> Unless stated otherwise above:
>> IBM United Kingdom Limited - Registered in England and Wales with number
>> 741598.
>> Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU
>>
>>
>>
>>
>>
>>
>>
>
>
> --
> Amila Suriarachchi
> WSO2 Inc.
> blog: http://amilachinthaka.blogspot.com/
>
--
Amila Suriarachchi
WSO2 Inc.
blog: http://amilachinthaka.blogspot.com/
Index: src/main/java/org/apache/sandesha2/storage/jdbc/PersistentRMDBeanMgr.java
===================================================================
--- src/main/java/org/apache/sandesha2/storage/jdbc/PersistentRMDBeanMgr.java
(revision 708805)
+++ src/main/java/org/apache/sandesha2/storage/jdbc/PersistentRMDBeanMgr.java
(working copy)
@@ -245,7 +245,7 @@
throws SandeshaStorageException {
if (log.isDebugEnabled()) log.debug("delete RMSBean sequenceID
" + sequenceID);
try {
- Statement stmt = getDbConnection().createStatement();
+ Statement stmt =
getDbConnection(JDBCTransaction.RMD_BEAN).createStatement();
stmt.executeUpdate("delete from wsrm_rmd where
sequence_id='" + sequenceID + "'");
stmt.close();
} catch (Exception ex) {
@@ -259,7 +259,7 @@
String sql = requestForModel(bean);
ArrayList<RMDBean> lst = new ArrayList<RMDBean>();
try {
- Statement stmt =
getDbConnection().createStatement(ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_UPDATABLE);
+ Statement stmt =
getDbConnection(JDBCTransaction.RMD_BEAN).createStatement(ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_UPDATABLE);
ResultSet rs = stmt.executeQuery(sql);
while (rs.next()) {
lst.add(getBean(rs));
@@ -277,7 +277,7 @@
throws SandeshaStorageException {
log.debug("insert " + bean);
try {
- PreparedStatement pstmt =
getDbConnection().prepareStatement("insert into wsrm_rmd(" +
+ PreparedStatement pstmt =
getDbConnection(JDBCTransaction.RMD_BEAN).prepareStatement("insert into
wsrm_rmd(" +
"sequence_id,to_epr_addr,to_epr,reply_to_epr_addr,reply_to_epr,acks_to_epr_addr,"
+
"acks_to_epr,rm_version,security_token_data," +
"last_activated_time,closed,terminated_flag,polling_mode,service_name," +
@@ -333,7 +333,7 @@
throws SandeshaStorageException {
RMDBean bean = null;
try {
- Statement stmt =
getDbConnection().createStatement(ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_UPDATABLE);
+ Statement stmt =
getDbConnection(JDBCTransaction.RMD_BEAN).createStatement(ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_UPDATABLE);
ResultSet rs = stmt.executeQuery("select * from
wsrm_rmd where sequence_id='" + sequenceID + "'");
if (! rs.next()) return bean;
bean = getBean(rs);
@@ -354,7 +354,7 @@
throws SandeshaStorageException {
log.debug("update " + bean);
try {
- PreparedStatement pstmt =
getDbConnection().prepareStatement("update wsrm_rmd set " +
+ PreparedStatement pstmt =
getDbConnection(JDBCTransaction.RMD_BEAN).prepareStatement("update wsrm_rmd set
" +
"to_epr_addr=?,to_epr=?,reply_to_epr_addr=?,reply_to_epr=?,acks_to_epr_addr=?,"
+
"acks_to_epr=?,rm_version=?,security_token_data=?," +
"last_activated_time=?,closed=?,terminated_flag=?,polling_mode=?,service_name=?,"
+
@@ -409,7 +409,7 @@
String sql = requestForModel(bean);
RMDBean result = null;
try {
- Statement stmt =
getDbConnection().createStatement(ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_UPDATABLE);
+ Statement stmt =
getDbConnection(JDBCTransaction.RMD_BEAN).createStatement(ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_UPDATABLE);
ResultSet rs = stmt.executeQuery(sql);
while (rs.next()) {
if (result == null) {
@@ -430,4 +430,16 @@
}
return result;
}
+
+ public void lock() throws SandeshaStorageException {
+ try {
+ Statement stmt =
getDbConnection(JDBCTransaction.RMD_BEAN).createStatement(ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_UPDATABLE);
+ ResultSet rs = stmt.executeQuery("select * from
wsrm_rmd");
+ rs.next();
+ rs.close();
+ stmt.close();
+ } catch (Exception ex) {
+ throw new SandeshaStorageException(ex);
+ }
+ }
}
Index:
src/main/java/org/apache/sandesha2/storage/jdbc/PersistentStorageManager.java
===================================================================
---
src/main/java/org/apache/sandesha2/storage/jdbc/PersistentStorageManager.java
(revision 708806)
+++
src/main/java/org/apache/sandesha2/storage/jdbc/PersistentStorageManager.java
(working copy)
@@ -198,10 +198,10 @@
*
* @return Data Base Connection
*/
- public Connection getDbConnection() {
+ public Connection getDbConnection(int level) {
JDBCTransaction transaction = (JDBCTransaction)
threadTransaction.get();
if (transaction == null) return dbConnection;
- return transaction.getDbConnection();
+ return transaction.getDbConnection(level);
}
@@ -235,7 +235,7 @@
}
/**/
try {
- Statement stmt = getDbConnection().createStatement();
+ Statement stmt =
getDbConnection(JDBCTransaction.MESSAGE_BEAN).createStatement();
/**/
ResultSet rs = stmt.executeQuery("select * from
wsrm_msgctx where ctx_key='" +
key + "'");
@@ -259,7 +259,7 @@
if (log.isDebugEnabled()) log.debug("Enter storeMessageContext
for key " + key + " context " + msgContext);
storageMap.put(key, msgContext);
try {
- PreparedStatement pstmt =
getDbConnection().prepareStatement("insert into
wsrm_msgctx(ctx_key,ctx)values(?,?)");
+ PreparedStatement pstmt =
getDbConnection(JDBCTransaction.MESSAGE_BEAN).prepareStatement("insert into
wsrm_msgctx(ctx_key,ctx)values(?,?)");
pstmt.setString(1, key);
ByteArrayOutputStream baos = new
ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
@@ -280,7 +280,7 @@
storageMap.put(key, msgContext);
PreparedStatement pstmt = null;
try {
- pstmt = getDbConnection().prepareStatement("update
wsrm_msgctx set ctx=?" +
+ pstmt =
getDbConnection(JDBCTransaction.MESSAGE_BEAN).prepareStatement("update
wsrm_msgctx set ctx=?" +
"where ctx_key='" + key + "'");
ByteArrayOutputStream baos = new
ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
@@ -299,7 +299,7 @@
throws SandeshaStorageException {
if (log.isDebugEnabled()) log.debug("removeMessageContext key :
" + key);
try {
- Statement stmt = getDbConnection().createStatement();
+ Statement stmt =
getDbConnection(JDBCTransaction.MESSAGE_BEAN).createStatement();
MessageContext messageInCache = (MessageContext)
storageMap.get(key);
if (messageInCache != null) storageMap.remove(key);
stmt.executeUpdate("delete from wsrm_msgctx where
ctx_key='" + key + "'");
@@ -309,4 +309,17 @@
}
}
+ public void lockMessageBean() throws SandeshaStorageException {
+ try {
+ Statement stmt =
getDbConnection(JDBCTransaction.MESSAGE_BEAN).createStatement();
+ ResultSet rs = stmt.executeQuery("select * from
wsrm_msgctx");
+ rs.next();
+ rs.close();
+ stmt.close();
+ } catch (Exception ex) {
+ log.error("RetrieveMessageContext exception " + ex);
+ throw new SandeshaStorageException(ex);
+ }
+ }
+
}
Index: src/main/java/org/apache/sandesha2/storage/jdbc/JDBCTransaction.java
===================================================================
--- src/main/java/org/apache/sandesha2/storage/jdbc/JDBCTransaction.java
(revision 708805)
+++ src/main/java/org/apache/sandesha2/storage/jdbc/JDBCTransaction.java
(working copy)
@@ -20,6 +20,8 @@
package org.apache.sandesha2.storage.jdbc;
import java.sql.*;
+import java.util.List;
+import java.util.ArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -37,19 +39,44 @@
private boolean active = false;
private Log log = LogFactory.getLog(getClass());
+ public static final int RMS_BEAN = 1;
+ public static final int RMD_BEAN = 2;
+ public static final int MESSAGE_BEAN = 3;
+ public static final int SENDER_BEAN = 4;
+ public static final int INVOKER_BEAN = 5;
+
+
+ private int currentLevel = 0;
+ private List grantedLevels = new ArrayList();
+
public JDBCTransaction(PersistentStorageManager pmgr) {
log.debug("new JDBCTransaction");
try {
this.pmgr = pmgr;
dbConnection = pmgr.dbConnect();
dbConnection.setAutoCommit(false);
-
dbConnection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
+
dbConnection.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);
active = true;
} catch (Exception ex) {
}
}
- public Connection getDbConnection() {
+ public Connection getDbConnection(int level) {
+
+ if (level != INVOKER_BEAN) {
+ if (currentLevel <= level) {
+ this.grantedLevels.add(level);
+ currentLevel = level;
+ } else {
+ if (!this.grantedLevels.contains(level)) {
+ // this is a non granted lower level
+ if (level != INVOKER_BEAN) {
+ throw new
RuntimeException("Current level " + currentLevel + " Grant level " + level);
+ }
+ }
+ }
+ }
+
return dbConnection;
}
Index:
src/main/java/org/apache/sandesha2/storage/jdbc/PersistentInvokerBeanMgr.java
===================================================================
---
src/main/java/org/apache/sandesha2/storage/jdbc/PersistentInvokerBeanMgr.java
(revision 708805)
+++
src/main/java/org/apache/sandesha2/storage/jdbc/PersistentInvokerBeanMgr.java
(working copy)
@@ -81,7 +81,7 @@
public boolean delete(String key)
throws SandeshaStorageException {
try {
- Statement stmt = getDbConnection().createStatement();
+ Statement stmt =
getDbConnection(JDBCTransaction.INVOKER_BEAN).createStatement();
stmt.executeUpdate("delete from wsrm_invoker where
message_context_ref_key='" + key + "'");
stmt.close();
} catch (Exception ex) {
@@ -95,7 +95,7 @@
String sql = requestForModel(bean);
ArrayList<InvokerBean> lst = new ArrayList<InvokerBean>();
try {
- Statement stmt =
getDbConnection().createStatement(ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_UPDATABLE);
+ Statement stmt =
getDbConnection(JDBCTransaction.INVOKER_BEAN).createStatement(ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_UPDATABLE);
ResultSet rs = stmt.executeQuery(sql);
while (rs.next()) {
lst.add(getInvokerBean(rs));
@@ -111,7 +111,7 @@
public boolean insert(InvokerBean bean)
throws SandeshaStorageException {
try {
- PreparedStatement pstmt =
getDbConnection().prepareStatement("insert into
wsrm_invoker(message_context_ref_key," +
+ PreparedStatement pstmt =
getDbConnection(JDBCTransaction.INVOKER_BEAN).prepareStatement("insert into
wsrm_invoker(message_context_ref_key," +
"sequence_id,context,msg_no,flags)values(?,?,?,?,?)");
pstmt.setString(1, bean.getMessageContextRefKey());
pstmt.setString(2, bean.getSequenceID());
@@ -132,7 +132,7 @@
throws SandeshaStorageException {
InvokerBean invokerBean = null;
try {
- Statement stmt =
getDbConnection().createStatement(ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_UPDATABLE);
+ Statement stmt =
getDbConnection(JDBCTransaction.INVOKER_BEAN).createStatement(ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_UPDATABLE);
ResultSet rs = stmt.executeQuery("select * from
wsrm_invoker where message_context_ref_key='" + key + "'");
if (! rs.next()) return invokerBean;
invokerBean = getInvokerBean(rs);
@@ -147,7 +147,7 @@
public boolean update(InvokerBean bean)
throws SandeshaStorageException {
try {
- PreparedStatement pstmt =
getDbConnection().prepareStatement("update wsrm_invoker set " +
+ PreparedStatement pstmt =
getDbConnection(JDBCTransaction.INVOKER_BEAN).prepareStatement("update
wsrm_invoker set " +
"sequence_id=?,context=?,msg_no=?,flags=? where message_context_ref_key='" +
bean.getMessageContextRefKey() + "'");
pstmt.setString(1, bean.getSequenceID());
pstmt.setLong(3, bean.getMsgNo());
@@ -168,7 +168,7 @@
String sql = requestForModel(bean);
InvokerBean result = null;
try {
- Statement stmt =
getDbConnection().createStatement(ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_UPDATABLE);
+ Statement stmt =
getDbConnection(JDBCTransaction.INVOKER_BEAN).createStatement(ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_UPDATABLE);
ResultSet rs = stmt.executeQuery(sql);
while (rs.next()) {
if (result == null) {
Index: src/main/java/org/apache/sandesha2/storage/jdbc/PersistentBeanMgr.java
===================================================================
--- src/main/java/org/apache/sandesha2/storage/jdbc/PersistentBeanMgr.java
(revision 708805)
+++ src/main/java/org/apache/sandesha2/storage/jdbc/PersistentBeanMgr.java
(working copy)
@@ -33,14 +33,17 @@
Log log = LogFactory.getLog(getClass());
PersistentStorageManager pmgr = null;
+
public PersistentBeanMgr(PersistentStorageManager pmgr) {
this.pmgr = pmgr;
}
- public Connection getDbConnection() {
- return pmgr.getDbConnection();
+ public Connection getDbConnection(int level) {
+ return pmgr.getDbConnection(level);
}
+
+
protected Object getObject(ResultSet rs, String field)
throws Exception {
// MySQL JDBC connector returns a byte array
Index:
src/main/java/org/apache/sandesha2/storage/jdbc/PersistentSenderBeanMgr.java
===================================================================
---
src/main/java/org/apache/sandesha2/storage/jdbc/PersistentSenderBeanMgr.java
(revision 708805)
+++
src/main/java/org/apache/sandesha2/storage/jdbc/PersistentSenderBeanMgr.java
(working copy)
@@ -181,7 +181,7 @@
throws SandeshaStorageException {
if (log.isDebugEnabled()) log.debug("Delete MsgID " +
messageID);
try {
- Statement stmt = getDbConnection().createStatement();
+ Statement stmt =
getDbConnection(JDBCTransaction.SENDER_BEAN).createStatement();
stmt.executeUpdate("delete from wsrm_sender where
message_id='" + messageID + "'");
stmt.close();
} catch (Exception ex) {
@@ -195,7 +195,7 @@
String sql = requestForModel(bean);
ArrayList<SenderBean> lst = new ArrayList<SenderBean>();
try {
- Statement stmt =
getDbConnection().createStatement(ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_UPDATABLE);
+ Statement stmt =
getDbConnection(JDBCTransaction.SENDER_BEAN).createStatement(ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_UPDATABLE);
ResultSet rs = stmt.executeQuery(sql);
while (rs.next()) {
lst.add(getBean(rs));
@@ -213,7 +213,7 @@
throws SandeshaStorageException {
ArrayList<SenderBean> lst = new ArrayList<SenderBean>();
try {
- Statement stmt =
getDbConnection().createStatement(ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_UPDATABLE);
+ Statement stmt =
getDbConnection(JDBCTransaction.SENDER_BEAN).createStatement(ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_UPDATABLE);
ResultSet rs = stmt.executeQuery("select * from
wsrm_sender where internal_sequence_id='" +
internalSequenceId + "'");
while (rs.next()) {
@@ -231,7 +231,7 @@
throws SandeshaStorageException {
SenderBean result = null;
try {
- Statement stmt = getDbConnection().createStatement();
+ Statement stmt =
getDbConnection(JDBCTransaction.SENDER_BEAN).createStatement();
String sql = "select * from wsrm_sender where
sequence_id";
if (sequenceId == null) sql += " is null ";
else sql += "='" + sequenceId + "' ";
@@ -260,7 +260,7 @@
throws SandeshaStorageException {
if (log.isDebugEnabled()) log.debug("Insert " + bean);
try {
- PreparedStatement pstmt =
getDbConnection().prepareStatement("insert into wsrm_sender(" +
+ PreparedStatement pstmt =
getDbConnection(JDBCTransaction.SENDER_BEAN).prepareStatement("insert into
wsrm_sender(" +
"message_id, message_context_ref_key,
internal_sequence_id, sequence_id," +
"to_address, inbound_sequence_id, send,
sent_count, message_number, resend," +
"time_to_send, message_type,
last_message, inbound_message_number, transport_available," +
@@ -293,7 +293,7 @@
throws SandeshaStorageException {
SenderBean bean = null;
try {
- Statement stmt =
getDbConnection().createStatement(ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_UPDATABLE);
+ Statement stmt =
getDbConnection(JDBCTransaction.SENDER_BEAN).createStatement(ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_UPDATABLE);
ResultSet rs = stmt.executeQuery("select * from
wsrm_sender where message_id='" + messageID + "'");
if (! rs.next()) return bean;
bean = getBean(rs);
@@ -309,7 +309,7 @@
throws SandeshaStorageException {
if (log.isDebugEnabled()) log.debug("Update " + bean);
try {
- PreparedStatement pstmt =
getDbConnection().prepareStatement("update wsrm_sender set " +
+ PreparedStatement pstmt =
getDbConnection(JDBCTransaction.SENDER_BEAN).prepareStatement("update
wsrm_sender set " +
"message_context_ref_key=?,
internal_sequence_id=?, sequence_id=?," +
"to_address=?, inbound_sequence_id=?,
send=?, sent_count=?, message_number=?, resend=?," +
"time_to_send=?, message_type=?,
last_message=?, inbound_message_number=?, transport_available=?," +
@@ -343,7 +343,7 @@
String sql = requestForModel(bean);
SenderBean result = null;
try {
- Statement stmt = getDbConnection().createStatement();
+ Statement stmt =
getDbConnection(JDBCTransaction.SENDER_BEAN).createStatement();
ResultSet rs = stmt.executeQuery(sql);
while (rs.next()) {
if (result == null) {
@@ -368,7 +368,7 @@
public SenderBean retrieveFromMessageRefKey(String
messageContextRefKey) {
SenderBean bean = null;
try {
- Statement stmt =
getDbConnection().createStatement(ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_UPDATABLE);
+ Statement stmt =
getDbConnection(JDBCTransaction.SENDER_BEAN).createStatement(ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_UPDATABLE);
ResultSet rs = stmt.executeQuery("select * from
wsrm_sender where message_context_ref_key='" + messageContextRefKey + "'");
if (! rs.next()) return bean;
bean = getBean(rs);
@@ -383,7 +383,7 @@
public SenderBean retrieve(String sequnceId, long messageNumber) throws
SandeshaStorageException {
SenderBean bean = null;
try {
- Statement stmt =
getDbConnection().createStatement(ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_UPDATABLE);
+ Statement stmt =
getDbConnection(JDBCTransaction.SENDER_BEAN).createStatement(ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_UPDATABLE);
ResultSet rs = stmt.executeQuery("select * from
wsrm_sender where sequence_id='" + sequnceId + "' " +
" and message_number=" + messageNumber);
if (! rs.next()) return bean;
Index: src/main/java/org/apache/sandesha2/storage/jdbc/PersistentRMSBeanMgr.java
===================================================================
--- src/main/java/org/apache/sandesha2/storage/jdbc/PersistentRMSBeanMgr.java
(revision 708805)
+++ src/main/java/org/apache/sandesha2/storage/jdbc/PersistentRMSBeanMgr.java
(working copy)
@@ -322,7 +322,7 @@
throws SandeshaStorageException {
if (log.isDebugEnabled()) log.debug("delete RMSBean msgId " +
msgId);
try {
- Statement stmt = getDbConnection().createStatement();
+ Statement stmt =
getDbConnection(JDBCTransaction.RMS_BEAN).createStatement();
stmt.executeUpdate("delete from wsrm_rms where
create_seq_msg_id='" + msgId + "'");
stmt.close();
} catch (Exception ex) {
@@ -336,7 +336,7 @@
String sql = requestForModel(bean);
ArrayList<RMSBean> lst = new ArrayList<RMSBean>();
try {
- Statement stmt =
getDbConnection().createStatement(ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_UPDATABLE);
+ Statement stmt =
getDbConnection(JDBCTransaction.RMS_BEAN).createStatement(ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_UPDATABLE);
ResultSet rs = stmt.executeQuery(sql);
while (rs.next()) {
lst.add(getBean(rs));
@@ -354,7 +354,7 @@
throws SandeshaStorageException {
log.debug("insert RMSBean " + bean);
try {
- PreparedStatement pstmt =
getDbConnection().prepareStatement("insert into wsrm_rms(" +
+ PreparedStatement pstmt =
getDbConnection(JDBCTransaction.RMS_BEAN).prepareStatement("insert into
wsrm_rms(" +
"create_seq_msg_id,sequence_id,to_epr_addr,to_epr,reply_to_epr_addr,reply_to_epr,acks_to_epr_addr,acks_to_epr,rm_version,security_token_data,"
+
"last_activated_time,closed,terminated_flag,polling_mode,service_name," +
"flags,id,internal_sequence_id,create_sequence_msg_store_key," +
@@ -427,7 +427,7 @@
log.debug("Retrieve msdId " + msgId);
RMSBean bean = null;
try {
- Statement stmt =
getDbConnection().createStatement(ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_UPDATABLE);
+ Statement stmt =
getDbConnection(JDBCTransaction.RMS_BEAN).createStatement(ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_UPDATABLE);
ResultSet rs = stmt.executeQuery("select * from
wsrm_rms where create_seq_msg_id='" + msgId + "'");
if (rs.next()) bean = getBean(rs);
rs.close();
@@ -446,7 +446,7 @@
log.debug("Update bean : " + bean);
}
try {
- PreparedStatement pstmt =
getDbConnection().prepareStatement("update wsrm_rms set " +
+ PreparedStatement pstmt =
getDbConnection(JDBCTransaction.RMS_BEAN).prepareStatement("update wsrm_rms set
" +
"sequence_id=?,to_epr_addr=?,to_epr=?,reply_to_epr_addr=?,reply_to_epr=?,acks_to_epr_addr=?,acks_to_epr=?,rm_version=?,security_token_data=?,"
+
"last_activated_time=?,closed=?,terminated_flag=?,polling_mode=?,service_name=?,"
+
"flags=?,id=?,internal_sequence_id=?,create_sequence_msg_store_key=?," +
@@ -519,7 +519,7 @@
String sql = requestForModel(bean);
RMSBean result = null;
try {
- Statement stmt =
getDbConnection().createStatement(ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_UPDATABLE);
+ Statement stmt =
getDbConnection(JDBCTransaction.RMS_BEAN).createStatement(ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_UPDATABLE);
ResultSet rs = stmt.executeQuery(sql);
while (rs.next()) {
if (result == null) {
@@ -549,7 +549,7 @@
String sql = requestForModel(dummyBean);
RMSBean result = null;
try {
- Statement stmt =
getDbConnection().createStatement(ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_UPDATABLE);
+ Statement stmt =
getDbConnection(JDBCTransaction.RMS_BEAN).createStatement(ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_UPDATABLE);
ResultSet rs = stmt.executeQuery(sql);
while (rs.next()) {
@@ -580,7 +580,7 @@
String sql = requestForModel(dummyBean);
RMSBean result = null;
try {
- Statement stmt =
getDbConnection().createStatement(ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_UPDATABLE);
+ Statement stmt =
getDbConnection(JDBCTransaction.RMS_BEAN).createStatement(ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_UPDATABLE);
ResultSet rs = stmt.executeQuery(sql);
while (rs.next()) {
if (result == null) {
@@ -603,4 +603,17 @@
log.debug("FindUnique RMSBean : " + result);
return result;
}
+
+ public void lock() throws SandeshaStorageException {
+ try {
+ Statement stmt =
getDbConnection(JDBCTransaction.RMS_BEAN).createStatement(ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_UPDATABLE);
+ ResultSet rs = stmt.executeQuery("select * from
wsrm_rms");
+ rs.next();
+ rs.close();
+ stmt.close();
+ } catch (Exception ex) {
+ log.error("Exception in getting lock " + ex);
+ throw new SandeshaStorageException("Exception in
PersistentRMSBeanManager::getting lock", ex);
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]