Author: rgodfrey Date: Tue Mar 17 21:19:05 2015 New Revision: 1667409 URL: http://svn.apache.org/r1667409 Log: QPID-6457 : [Java Broker] Make asynchronous commits occur on executor threads
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java qpid/trunk/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java qpid/trunk/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java?rev=1667409&r1=1667408&r2=1667409&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java (original) +++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java Tue Mar 17 21:19:05 2015 @@ -35,6 +35,17 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; @@ -109,11 +120,16 @@ public abstract class AbstractJDBCMessag " WHERE format = ? and global_id = ? and branch_id = ?"; protected final EventManager _eventManager = new EventManager(); + private ConfiguredObject<?> _parent; protected abstract boolean isMessageStoreOpen(); protected abstract void checkMessageStoreOpen(); + private ScheduledThreadPoolExecutor _executor; + public AbstractJDBCMessageStore() + { + } protected void setMaximumMessageId() { @@ -269,6 +285,34 @@ public abstract class AbstractJDBCMessag } } + protected void initMessageStore(final ConfiguredObject<?> parent) + { + _parent = parent; + _executor = new ScheduledThreadPoolExecutor(4, new ThreadFactory() + { + private final AtomicInteger _count = new AtomicInteger(); + @Override + public Thread newThread(final Runnable r) + { + final Thread thread = Executors.defaultThreadFactory().newThread(r); + thread.setName(parent.getName() + "-store-"+_count.incrementAndGet()); + return thread; + } + }); + _executor.prestartAllCoreThreads(); + + } + + @Override + public void closeMessageStore() + { + if(_executor != null) + { + _executor.shutdown(); + } + + } + protected abstract Logger getLogger(); protected abstract String getSqlBlobType(); @@ -835,10 +879,105 @@ public abstract class AbstractJDBCMessag } } - private FutureResult commitTranAsync(ConnectionWrapper connWrapper) throws StoreException + private FutureResult commitTranAsync(final ConnectionWrapper connWrapper) throws StoreException { - commitTran(connWrapper); - return FutureResult.IMMEDIATE_FUTURE; + final Future<?> result = _executor.submit(new Runnable() + { + @Override + public void run() + { + commitTran(connWrapper); + } + }); + return new FutureResult() + { + @Override + public boolean isComplete() + { + boolean done = result.isDone(); + try + { + result.get(); + } + catch (InterruptedException e) + { + // this won't happen as we're actually already done; + } + catch (ExecutionException e) + { + if(e.getCause() instanceof RuntimeException) + { + throw (RuntimeException)e.getCause(); + } + else if(e.getCause() instanceof Error) + { + throw (Error)e.getCause(); + } + else + { + throw new StoreException(e); + } + } + return done; + } + + @Override + public void waitForCompletion() + { + try + { + result.get(); + } + catch (InterruptedException e) + { + throw new StoreException(e); + } + catch (ExecutionException e) + { + if(e.getCause() instanceof RuntimeException) + { + throw (RuntimeException)e.getCause(); + } + else if(e.getCause() instanceof Error) + { + throw (Error)e.getCause(); + } + else + { + throw new StoreException(e); + } + } + } + + @Override + public void waitForCompletion(final long timeout) throws TimeoutException + { + + try + { + result.get(timeout, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + throw new StoreException(e); + } + catch (ExecutionException e) + { + if(e.getCause() instanceof RuntimeException) + { + throw (RuntimeException)e.getCause(); + } + else if(e.getCause() instanceof Error) + { + throw (Error)e.getCause(); + } + else + { + throw new StoreException(e); + } + } + } + }; } private void abortTran(ConnectionWrapper connWrapper) throws StoreException Modified: qpid/trunk/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java?rev=1667409&r1=1667408&r2=1667409&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java (original) +++ qpid/trunk/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java Tue Mar 17 21:19:05 2015 @@ -50,6 +50,7 @@ public abstract class AbstractDerbyMessa if (_messageStoreOpen.compareAndSet(false, true)) { _parent = parent; + initMessageStore(parent); DerbyUtils.loadDerbyDriver(); @@ -85,7 +86,14 @@ public abstract class AbstractDerbyMessa { if (_messageStoreOpen.compareAndSet(true, false)) { - doClose(); + try + { + doClose(); + } + finally + { + super.closeMessageStore(); + } } } Modified: qpid/trunk/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java?rev=1667409&r1=1667408&r2=1667409&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java (original) +++ qpid/trunk/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java Tue Mar 17 21:19:05 2015 @@ -76,6 +76,7 @@ public abstract class GenericAbstractJDB finally { doClose(); + super.closeMessageStore(); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org