Author: rmannibucau
Date: Tue Jul 24 22:04:16 2012
New Revision: 1365321
URL: http://svn.apache.org/viewvc?rev=1365321&view=rev
Log:
using java.util.concurrent to lock on xaresource and adding some tests (ut and
// threads)
Added:
openejb/branches/openejb-pool/container/openejb-core/src/test/java/org/apache/openejb/resource/jdbc/MultiThreadedManagedDataSourceTest.java
openejb/branches/openejb-pool/container/openejb-core/src/test/java/org/apache/openejb/resource/jdbc/UTManagedDataSourceTest.java
Modified:
openejb/branches/openejb-pool/container/openejb-core/src/main/java/org/apache/openejb/resource/jdbc/managed/local/LocalXAResource.java
Modified:
openejb/branches/openejb-pool/container/openejb-core/src/main/java/org/apache/openejb/resource/jdbc/managed/local/LocalXAResource.java
URL:
http://svn.apache.org/viewvc/openejb/branches/openejb-pool/container/openejb-core/src/main/java/org/apache/openejb/resource/jdbc/managed/local/LocalXAResource.java?rev=1365321&r1=1365320&r2=1365321&view=diff
==============================================================================
---
openejb/branches/openejb-pool/container/openejb-core/src/main/java/org/apache/openejb/resource/jdbc/managed/local/LocalXAResource.java
(original)
+++
openejb/branches/openejb-pool/container/openejb-core/src/main/java/org/apache/openejb/resource/jdbc/managed/local/LocalXAResource.java
Tue Jul 24 22:04:16 2012
@@ -5,25 +5,35 @@ import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.sql.Connection;
import java.sql.SQLException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
-// seems synchronized is faster than java.util.concurrent for 1 or 2 threads
-// so should be fine here
-// moreover all operations are very short so synchronized is faster
public class LocalXAResource implements XAResource {
private final Connection connection;
private Xid currentXid;
private boolean originalAutoCommit;
+ private final Lock lock = new ReentrantLock();
public LocalXAResource(final Connection localTransaction) {
connection = localTransaction;
}
- public synchronized Xid getXid() {
+ public Xid getXid() {
+ checkLock();
return currentXid;
}
@Override
- public synchronized void start(final Xid xid, int flag) throws XAException
{
+ public void start(final Xid xid, int flag) throws XAException {
+ try {
+ if (!lock.tryLock(10, TimeUnit.MINUTES)) {
+
+ }
+ } catch (InterruptedException e) {
+ throw (XAException) new XAException("can't get
lock").initCause(cantGetLock());
+ }
+
if (flag == XAResource.TMNOFLAGS) {
if (currentXid != null) {
throw new XAException("Already enlisted in another transaction
with xid " + xid);
@@ -52,18 +62,28 @@ public class LocalXAResource implements
}
}
+ private RuntimeException cantGetLock() {
+ return new IllegalStateException("can't get lock on resource with Xid
" + currentXid + " from thread " + Thread.currentThread().getName());
+ }
+
@Override
- public synchronized void end(final Xid xid, int flag) throws XAException {
- if (xid == null) {
- throw new NullPointerException("xid is null");
- }
- if (!this.currentXid.equals(xid)) {
- throw new XAException("Invalid Xid: expected " + this.currentXid +
", but was " + xid);
+ public void end(final Xid xid, int flag) throws XAException {
+ try {
+ if (xid == null) {
+ throw new NullPointerException("xid is null");
+ }
+ if (!this.currentXid.equals(xid)) {
+ throw new XAException("Invalid Xid: expected " +
this.currentXid + ", but was " + xid);
+ }
+ } finally {
+ lock.unlock();
}
}
@Override
- public synchronized int prepare(final Xid xid) {
+ public int prepare(final Xid xid) {
+ checkLock();
+
try {
if (connection.isReadOnly()) {
connection.setAutoCommit(originalAutoCommit);
@@ -77,7 +97,9 @@ public class LocalXAResource implements
}
@Override
- public synchronized void commit(final Xid xid, boolean flag) throws
XAException {
+ public void commit(final Xid xid, boolean flag) throws XAException {
+ checkLock();
+
if (xid == null) {
throw new NullPointerException("xid is null");
}
@@ -106,7 +128,9 @@ public class LocalXAResource implements
}
@Override
- public synchronized void rollback(final Xid xid) throws XAException {
+ public void rollback(final Xid xid) throws XAException {
+ checkLock();
+
if (xid == null) {
throw new NullPointerException("xid is null");
}
@@ -134,7 +158,8 @@ public class LocalXAResource implements
}
@Override
- public synchronized void forget(final Xid xid) {
+ public void forget(final Xid xid) {
+ checkLock();
if (xid != null && currentXid.equals(xid)) {
currentXid = null;
}
@@ -154,4 +179,10 @@ public class LocalXAResource implements
public boolean setTransactionTimeout(int transactionTimeout) {
return false;
}
+
+ private void checkLock() {
+ if (!lock.tryLock()) {
+ throw cantGetLock();
+ }
+ }
}
Added:
openejb/branches/openejb-pool/container/openejb-core/src/test/java/org/apache/openejb/resource/jdbc/MultiThreadedManagedDataSourceTest.java
URL:
http://svn.apache.org/viewvc/openejb/branches/openejb-pool/container/openejb-core/src/test/java/org/apache/openejb/resource/jdbc/MultiThreadedManagedDataSourceTest.java?rev=1365321&view=auto
==============================================================================
---
openejb/branches/openejb-pool/container/openejb-core/src/test/java/org/apache/openejb/resource/jdbc/MultiThreadedManagedDataSourceTest.java
(added)
+++
openejb/branches/openejb-pool/container/openejb-core/src/test/java/org/apache/openejb/resource/jdbc/MultiThreadedManagedDataSourceTest.java
Tue Jul 24 22:04:16 2012
@@ -0,0 +1,231 @@
+package org.apache.openejb.resource.jdbc;
+
+import org.apache.openejb.jee.EjbJar;
+import org.apache.openejb.jee.SingletonBean;
+import org.apache.openejb.junit.ApplicationComposer;
+import org.apache.openejb.junit.Configuration;
+import org.apache.openejb.junit.Module;
+import org.apache.openejb.resource.jdbc.managed.local.ManagedConnection;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import javax.annotation.Resource;
+import javax.ejb.EJB;
+import javax.ejb.EJBContext;
+import javax.ejb.LocalBean;
+import javax.ejb.Singleton;
+import javax.sql.DataSource;
+import java.lang.reflect.Field;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@RunWith(ApplicationComposer.class)
+public class MultiThreadedManagedDataSourceTest {
+ private static final String URL =
"jdbc:hsqldb:mem:multi-tx-managed;hsqldb.tx=MVCC"; // mvcc otherwise multiple
transaction tests will fail
+ private static final String USER = "sa";
+ private static final String PASSWORD = "";
+ private static final String TABLE =
"PUBLIC.MULTI_TX_MANAGED_DATASOURCE_TEST";
+ private static final int INSERTS_NB = 200;
+
+ @EJB
+ private Persister persistManager;
+
+ @BeforeClass
+ public static void createTable() throws SQLException,
ClassNotFoundException {
+ Class.forName("org.hsqldb.jdbcDriver");
+
+ final Connection connection = DriverManager.getConnection(URL, USER,
PASSWORD);
+ final Statement statement = connection.createStatement();
+ statement.execute("CREATE TABLE " + TABLE + "(ID INTEGER)");
+ statement.close();
+ connection.commit();
+ connection.close();
+ }
+
+ @Configuration
+ public Properties config() {
+ final Properties p = new Properties();
+ p.put("openejb.jdbc.datasource-creator", "dbcp-alternative");
+
+ p.put("managed", "new://Resource?type=DataSource");
+ p.put("managed.JdbcDriver", "org.hsqldb.jdbcDriver");
+ p.put("managed.JdbcUrl", URL);
+ p.put("managed.UserName", USER);
+ p.put("managed.Password", PASSWORD);
+ p.put("managed.JtaManaged", "true");
+ return p;
+ }
+
+ @Module
+ public EjbJar app() throws Exception {
+ return new EjbJar()
+ .enterpriseBean(new
SingletonBean(Persister.class).localBean());
+ }
+
+ @LocalBean
+ @Singleton
+ public static class Persister {
+ private static final AtomicInteger ID = new AtomicInteger(1);
+
+ @Resource(name = "managed")
+ private DataSource ds;
+
+ @Resource
+ private EJBContext context;
+
+ public int save() throws SQLException {
+ int id = ID.getAndIncrement();
+ MultiThreadedManagedDataSourceTest.save(ds, id);
+ return id;
+ }
+
+ public int saveRollback(boolean ok) throws SQLException {
+ int id = ID.getAndIncrement();
+ MultiThreadedManagedDataSourceTest.save(ds, id);
+ if (!ok) {
+ context.setRollbackOnly();
+ }
+ return id;
+ }
+ }
+
+ @Test
+ public void inserts() throws SQLException {
+ final AtomicInteger errors = new AtomicInteger(0);
+ final AtomicInteger fail = new AtomicInteger(0);
+ run(new Runnable() {
+ @Override
+ public void run() {
+ int id = -1;
+ try {
+ id = persistManager.save();
+ } catch (SQLException e) {
+ errors.incrementAndGet();
+ }
+ try {
+ if (!exists(id)) {
+ fail.incrementAndGet();
+ }
+ } catch (SQLException e) {
+ errors.incrementAndGet();
+ }
+ }
+ });
+ assertEquals(0, errors.get());
+ assertEquals(0, fail.get());
+ assertEquals(INSERTS_NB, count(""));
+ }
+
+ @Test
+ public void insertsWithRollback() throws SQLException {
+ final AtomicInteger errors = new AtomicInteger(0);
+ final AtomicInteger fail = new AtomicInteger(0);
+ final AtomicInteger ok = new AtomicInteger(0);
+ run(new Runnable() {
+ @Override
+ public void run() {
+ boolean rollback = Math.random() > 0.5;
+ if (!rollback) {
+ ok.incrementAndGet();
+ }
+ int id = -1;
+ try {
+ id = persistManager.saveRollback(!rollback);
+ } catch (SQLException e) {
+ errors.incrementAndGet();
+ }
+ if (!rollback) {
+ try {
+ if (!exists(id)) {
+ fail.incrementAndGet();
+ }
+ } catch (SQLException e) {
+ errors.incrementAndGet();
+ }
+ }
+ }
+ });
+ assertEquals(0, errors.get());
+ assertEquals(0, fail.get());
+ assertEquals(ok.get(), count(""));
+ }
+
+ @After
+ public void checkTxMapIsEmpty() throws Exception { // avoid memory leak
+ final Field map =
ManagedConnection.class.getDeclaredField("CONNECTION_BY_TX");
+ map.setAccessible(true);
+ final Map<?, ?> instance = (Map<?, ?>) map.get(null);
+ assertEquals(0, instance.size());
+
+ execute(DriverManager.getConnection(URL, USER, PASSWORD), "DELETE FROM
" + TABLE);
+ }
+
+ private static boolean exists(int id) throws SQLException {
+ return count(" WHERE ID = " + id) == 1;
+ }
+
+ private static int count(String where) throws SQLException {
+ final Connection connection = DriverManager.getConnection(URL, USER,
PASSWORD);
+ final Statement statement = connection.createStatement();
+ final ResultSet result = statement.executeQuery("SELECT count(*) AS NB
FROM " + TABLE + where);
+ try {
+ assertTrue(result.next());
+ return result.getInt(1);
+ } finally {
+ statement.close();
+ connection.close();
+ }
+ }
+
+ private static void save(final DataSource ds, int id) throws SQLException {
+ execute(ds, "INSERT INTO " + TABLE + "(ID) VALUES(" + id + ")");
+ }
+
+ private static void execute(final DataSource ds, final String sql) throws
SQLException {
+ final Connection connection = ds.getConnection();
+ final Statement statement = connection.createStatement();
+ statement.executeUpdate(sql);
+ statement.close();
+ connection.close();
+ }
+
+ private static void execute(final Connection connection, final String sql)
throws SQLException {
+ final Statement statement = connection.createStatement();
+ statement.executeUpdate(sql);
+ statement.close();
+ connection.close();
+ }
+
+ private void run(final Runnable runnable) {
+ final ExecutorService es = Executors.newFixedThreadPool(20);
+ for (int i = 0; i < INSERTS_NB; i++) {
+ es.submit(new Runnable() {
+ @Override
+ public void run() {
+ runnable.run();
+ }
+ });
+ }
+ es.shutdown();
+ try {
+ es.awaitTermination(5, TimeUnit.MINUTES);
+ } catch (InterruptedException e) {
+ fail();
+ }
+ }
+}
Added:
openejb/branches/openejb-pool/container/openejb-core/src/test/java/org/apache/openejb/resource/jdbc/UTManagedDataSourceTest.java
URL:
http://svn.apache.org/viewvc/openejb/branches/openejb-pool/container/openejb-core/src/test/java/org/apache/openejb/resource/jdbc/UTManagedDataSourceTest.java?rev=1365321&view=auto
==============================================================================
---
openejb/branches/openejb-pool/container/openejb-core/src/test/java/org/apache/openejb/resource/jdbc/UTManagedDataSourceTest.java
(added)
+++
openejb/branches/openejb-pool/container/openejb-core/src/test/java/org/apache/openejb/resource/jdbc/UTManagedDataSourceTest.java
Tue Jul 24 22:04:16 2012
@@ -0,0 +1,227 @@
+package org.apache.openejb.resource.jdbc;
+
+import org.apache.openejb.jee.EjbJar;
+import org.apache.openejb.jee.SingletonBean;
+import org.apache.openejb.junit.ApplicationComposer;
+import org.apache.openejb.junit.Configuration;
+import org.apache.openejb.junit.Module;
+import org.apache.openejb.resource.jdbc.managed.local.ManagedConnection;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import javax.annotation.Resource;
+import javax.ejb.EJB;
+import javax.ejb.LocalBean;
+import javax.ejb.Singleton;
+import javax.ejb.TransactionManagement;
+import javax.ejb.TransactionManagementType;
+import javax.sql.DataSource;
+import javax.transaction.UserTransaction;
+import java.lang.reflect.Field;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(ApplicationComposer.class)
+public class UTManagedDataSourceTest {
+ private static final String URL =
"jdbc:hsqldb:mem:managed;hsqldb.tx=MVCC"; // mvcc otherwise multiple
transaction tests will fail
+ private static final String USER = "sa";
+ private static final String PASSWORD = "";
+ private static final String TABLE = "PUBLIC.MANAGED_DATASOURCE_TEST";
+
+ @EJB
+ private Persister persistManager;
+
+ @BeforeClass
+ public static void createTable() throws SQLException,
ClassNotFoundException {
+ Class.forName("org.hsqldb.jdbcDriver");
+
+ final Connection connection = DriverManager.getConnection(URL, USER,
PASSWORD);
+ final Statement statement = connection.createStatement();
+ statement.execute("CREATE TABLE " + TABLE + "(ID INTEGER)");
+ statement.close();
+ connection.commit();
+ connection.close();
+ }
+
+ @Configuration
+ public Properties config() {
+ final Properties p = new Properties();
+ p.put("openejb.jdbc.datasource-creator", "dbcp-alternative");
+
+ p.put("managed", "new://Resource?type=DataSource");
+ p.put("managed.JdbcDriver", "org.hsqldb.jdbcDriver");
+ p.put("managed.JdbcUrl", URL);
+ p.put("managed.UserName", USER);
+ p.put("managed.Password", PASSWORD);
+ p.put("managed.JtaManaged", "true");
+ return p;
+ }
+
+ @Module
+ public EjbJar app() throws Exception {
+ return new EjbJar()
+ .enterpriseBean(new SingletonBean(Persister.class).localBean())
+ .enterpriseBean(new
SingletonBean(OtherPersister.class).localBean());
+ }
+
+ @LocalBean
+ @Singleton
+ @TransactionManagement(TransactionManagementType.BEAN)
+ public static class OtherPersister {
+ @Resource(name = "managed")
+ private DataSource ds;
+
+ @Resource
+ private UserTransaction ut;
+
+ public void save() throws Exception {
+ ut.begin();
+ UTManagedDataSourceTest.save(ds, 10);
+ ut.commit();
+ }
+
+ public void saveAndRollback() throws Exception {
+ ut.begin();
+ UTManagedDataSourceTest.save(ds, 11);
+ ut.rollback();
+ }
+ }
+
+ @LocalBean
+ @Singleton
+ @TransactionManagement(TransactionManagementType.BEAN)
+ public static class Persister {
+ @Resource(name = "managed")
+ private DataSource ds;
+
+ @EJB
+ private OtherPersister other;
+
+ @Resource
+ private UserTransaction ut;
+
+ public void save() throws Exception {
+ ut.begin();
+ UTManagedDataSourceTest.save(ds, 1);
+ ut.commit();
+ }
+
+ public void saveAndRollback() throws Exception {
+ ut.begin();
+ UTManagedDataSourceTest.save(ds, 2);
+ ut.rollback();
+ }
+
+ public void saveTwice() throws Exception {
+ ut.begin();
+ UTManagedDataSourceTest.save(ds, 3);
+ UTManagedDataSourceTest.save(ds, 4);
+ ut.commit();
+ }
+
+ public void rollbackMultipleSave() throws Exception {
+ ut.begin();
+ UTManagedDataSourceTest.save(ds, 5);
+ UTManagedDataSourceTest.save(ds, 6);
+ ut.rollback();
+ }
+
+ public void saveInThisTxAndAnotherOne() throws Exception {
+ ut.begin();
+ UTManagedDataSourceTest.save(ds, 7);
+ other.save();
+ ut.commit();
+ }
+
+ public void saveInThisTxAndRollbackInAnotherOne() throws Exception {
+ ut.begin();
+ UTManagedDataSourceTest.save(ds, 8);
+ other.saveAndRollback();
+ ut.commit();
+ }
+ }
+
+ @Test
+ public void commit() throws Exception {
+ persistManager.save();
+ assertTrue(exists(1));
+ }
+
+ @Test
+ public void rollback() throws Exception {
+ persistManager.saveAndRollback();
+ assertFalse(exists(2));
+ }
+
+ @Test
+ public void commit2() throws Exception {
+ persistManager.saveTwice();
+ assertTrue(exists(3));
+ assertTrue(exists(4));
+ }
+
+ @Test
+ public void rollback2() throws Exception {
+ persistManager.rollbackMultipleSave();
+ assertFalse(exists(5));
+ assertFalse(exists(6));
+ }
+
+ @Test
+ public void saveDifferentTx() throws Exception {
+ persistManager.saveInThisTxAndAnotherOne();
+ assertTrue(exists(7));
+ assertTrue(exists(10));
+ }
+
+ @Test
+ public void saveRollbackDifferentTx() throws Exception {
+ persistManager.saveInThisTxAndRollbackInAnotherOne();
+ assertTrue(exists(8));
+ assertFalse(exists(12));
+ }
+
+ @After
+ public void checkTxMapIsEmpty() throws Exception { // avoid memory leak
+ final Field map =
ManagedConnection.class.getDeclaredField("CONNECTION_BY_TX");
+ map.setAccessible(true);
+ final Map<?, ?> instance = (Map<?, ?>) map.get(null);
+ assertEquals(0, instance.size());
+ }
+
+ private static boolean exists(int id) throws Exception {
+ final Connection connection = DriverManager.getConnection(URL, USER,
PASSWORD);
+ final Statement statement = connection.createStatement();
+ final ResultSet result = statement.executeQuery("SELECT count(*) AS NB
FROM " + TABLE + " WHERE ID = " + id);
+ try {
+ assertTrue(result.next());
+ return result.getInt(1) == 1;
+ } finally {
+ statement.close();
+ connection.close();
+ }
+ }
+
+ private static void save(final DataSource ds, int id) throws Exception {
+ execute(ds, "INSERT INTO " + TABLE + "(ID) VALUES(" + id + ")");
+ }
+
+ private static void execute(final DataSource ds, final String sql) throws
Exception {
+ final Connection connection = ds.getConnection();
+ final Statement statement = connection.createStatement();
+ statement.executeUpdate(sql);
+ statement.close();
+ connection.close();
+ }
+}