Author: trustin
Date: Tue Mar 11 05:58:01 2008
New Revision: 635907
URL: http://svn.apache.org/viewvc?rev=635907&view=rev
Log:
Moved up all executor management to AbstractIoService - hence removed a lot of
code duplication
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoAcceptor.java
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoConnector.java
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingConnectionlessIoAcceptor.java
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoAcceptor.java
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoConnector.java
mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoAcceptor.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoAcceptor.java?rev=635907&r1=635906&r2=635907&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoAcceptor.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoAcceptor.java
Tue Mar 11 05:58:01 2008
@@ -27,6 +27,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.Executor;
/**
@@ -54,8 +55,8 @@
*/
protected final Object bindLock = new Object();
- protected AbstractIoAcceptor(IoSessionConfig sessionConfig) {
- super(sessionConfig);
+ protected AbstractIoAcceptor(IoSessionConfig sessionConfig, Executor
executor) {
+ super(sessionConfig, executor);
defaultLocalAddresses.add(null);
}
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoConnector.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoConnector.java?rev=635907&r1=635906&r2=635907&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoConnector.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoConnector.java
Tue Mar 11 05:58:01 2008
@@ -20,6 +20,7 @@
package org.apache.mina.common;
import java.net.SocketAddress;
+import java.util.concurrent.Executor;
/**
* A base implementation of [EMAIL PROTECTED] IoConnector}.
@@ -33,12 +34,11 @@
* The minimum timeout value that is supported (in milliseconds).
*/
private long connectTimeoutCheckInterval = 50L;
-
- private long connectTimeoutInMillis = 60*1000L; // 1 minute by default
+ private long connectTimeoutInMillis = 60 * 1000L; // 1 minute by default
private SocketAddress defaultRemoteAddress;
- protected AbstractIoConnector(IoSessionConfig sessionConfig) {
- super(sessionConfig);
+ protected AbstractIoConnector(IoSessionConfig sessionConfig, Executor
executor) {
+ super(sessionConfig, executor);
}
/**
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java?rev=635907&r1=635906&r2=635907&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java
Tue Mar 11 05:58:01 2008
@@ -23,8 +23,14 @@
import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.mina.util.NamePreservingRunnable;
+
/**
* Base implementation of [EMAIL PROTECTED] IoService}s.
@@ -56,6 +62,8 @@
public void sessionDestroyed(IoSession session) {}
};
+ private static final AtomicInteger id = new AtomicInteger();
+
/**
* Current filter chain builder.
*/
@@ -73,7 +81,10 @@
* Maintains the [EMAIL PROTECTED] IoServiceListener}s of this service.
*/
private final IoServiceListenerSupport listeners;
-
+
+ private final Executor executor;
+ private final String threadName;
+ private final boolean createdExecutor;
private final Object disposalLock = new Object();
private volatile boolean disposing;
private volatile boolean disposed;
@@ -124,7 +135,7 @@
*/
private IoSessionConfig sessionConfig;
- protected AbstractIoService(IoSessionConfig sessionConfig) {
+ protected AbstractIoService(IoSessionConfig sessionConfig, Executor
executor) {
if (sessionConfig == null) {
throw new NullPointerException("sessionConfig");
}
@@ -143,6 +154,16 @@
// Make JVM load the exception monitor before some transports
// change the thread context class loader.
ExceptionMonitor.getInstance();
+
+ if (executor == null) {
+ this.executor = Executors.newCachedThreadPool();
+ this.createdExecutor = true;
+ } else {
+ this.executor = executor;
+ this.createdExecutor = false;
+ }
+
+ this.threadName = getClass().getSimpleName() + '-' +
id.incrementAndGet();
}
public final IoFilterChainBuilder getFilterChainBuilder() {
@@ -200,6 +221,10 @@
} catch (Exception e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
} finally {
+ if (createdExecutor) {
+ ((ExecutorService) executor).shutdown();
+ }
+
if (disposalFuture == null) {
disposed = true;
}
@@ -701,6 +726,18 @@
protected final IoServiceListenerSupport getListeners() {
return listeners;
+ }
+
+ protected final void executeWorker(Runnable worker) {
+ executeWorker(worker, null);
+ }
+
+ protected final void executeWorker(Runnable worker, String suffix) {
+ String actualThreadName = threadName;
+ if (suffix != null) {
+ actualThreadName = actualThreadName + '-' + suffix;
+ }
+ executor.execute(new NamePreservingRunnable(worker, actualThreadName));
}
// TODO Figure out make it work without causing a compiler error / warning.
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingConnectionlessIoAcceptor.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingConnectionlessIoAcceptor.java?rev=635907&r1=635906&r2=635907&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingConnectionlessIoAcceptor.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingConnectionlessIoAcceptor.java
Tue Mar 11 05:58:01 2008
@@ -30,14 +30,6 @@
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.mina.util.NamePreservingRunnable;
/**
* [EMAIL PROTECTED] IoAcceptor} for datagram transport (UDP/IP).
@@ -50,12 +42,7 @@
private static final IoSessionRecycler DEFAULT_RECYCLER = new
ExpiringSessionRecycler();
- private static final AtomicInteger id = new AtomicInteger();
-
private final Object lock = new Object();
- private final Executor executor;
- private final boolean createdExecutor;
- private final String threadName;
private final IoProcessor<T> processor = new
ConnectionlessAcceptorProcessor();
private final Queue<AcceptorOperationFuture> registerQueue =
new ConcurrentLinkedQueue<AcceptorOperationFuture>();
@@ -84,20 +71,8 @@
* Creates a new instance.
*/
protected AbstractPollingConnectionlessIoAcceptor(IoSessionConfig
sessionConfig, Executor executor) {
- super(sessionConfig);
+ super(sessionConfig, executor);
- threadName = getClass().getSimpleName() + '-' + id.incrementAndGet();
-
- if (executor == null) {
- this.executor = new ThreadPoolExecutor(
- 1, 1, 1L, TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>());
- this.createdExecutor = true;
- } else {
- this.executor = executor;
- this.createdExecutor = false;
- }
-
try {
init();
selectable = true;
@@ -135,16 +110,8 @@
protected IoFuture dispose0() throws Exception {
unbind();
if (!disposalFuture.isDone()) {
- try {
- startupWorker();
- wakeup();
- } catch (RejectedExecutionException e) {
- if (createdExecutor) {
- // Ignore.
- } else {
- throw e;
- }
- }
+ startupWorker();
+ wakeup();
}
return disposalFuture;
}
@@ -307,8 +274,7 @@
synchronized (lock) {
if (worker == null) {
worker = new Worker();
- executor.execute(
- new NamePreservingRunnable(worker, threadName));
+ executeWorker(worker);
}
}
}
@@ -368,9 +334,6 @@
ExceptionMonitor.getInstance().exceptionCaught(e);
} finally {
disposalFuture.setValue(true);
- if (createdExecutor) {
- ((ExecutorService) executor).shutdown();
- }
}
}
}
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoAcceptor.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoAcceptor.java?rev=635907&r1=635906&r2=635907&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoAcceptor.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoAcceptor.java
Tue Mar 11 05:58:01 2008
@@ -30,14 +30,6 @@
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.mina.util.NamePreservingRunnable;
/**
* @author The Apache MINA Project ([EMAIL PROTECTED])
@@ -46,11 +38,6 @@
public abstract class AbstractPollingIoAcceptor<T extends AbstractIoSession, H>
extends AbstractIoAcceptor {
- private static final AtomicInteger id = new AtomicInteger();
-
- private final Executor executor;
- private final boolean createdExecutor;
- private final String threadName;
private final IoProcessor<T> processor;
private final boolean createdProcessor;
@@ -89,23 +76,12 @@
}
private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor
executor, IoProcessor<T> processor, boolean createdProcessor) {
- super(sessionConfig);
+ super(sessionConfig, executor);
if (processor == null) {
throw new NullPointerException("processor");
}
- if (executor == null) {
- this.executor = new ThreadPoolExecutor(
- 1, 1, 1L, TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>());
- this.createdExecutor = true;
- } else {
- this.executor = executor;
- this.createdExecutor = false;
- }
-
- this.threadName = getClass().getSimpleName() + '-' +
id.incrementAndGet();
this.processor = processor;
this.createdProcessor = createdProcessor;
@@ -141,16 +117,8 @@
protected IoFuture dispose0() throws Exception {
unbind();
if (!disposalFuture.isDone()) {
- try {
- startupWorker();
- wakeup();
- } catch (RejectedExecutionException e) {
- if (createdExecutor) {
- // Ignore.
- } else {
- throw e;
- }
- }
+ startupWorker();
+ wakeup();
}
return disposalFuture;
}
@@ -202,7 +170,7 @@
synchronized (lock) {
if (worker == null) {
worker = new Worker();
- executor.execute(new NamePreservingRunnable(worker,
threadName));
+ executeWorker(worker);
}
}
}
@@ -279,9 +247,6 @@
ExceptionMonitor.getInstance().exceptionCaught(e);
} finally {
disposalFuture.setDone();
- if (createdExecutor) {
- ((ExecutorService) executor).shutdown();
- }
}
}
}
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoConnector.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoConnector.java?rev=635907&r1=635906&r2=635907&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoConnector.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoConnector.java
Tue Mar 11 05:58:01 2008
@@ -25,14 +25,6 @@
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.mina.util.NamePreservingRunnable;
/**
* @author The Apache MINA Project ([EMAIL PROTECTED])
@@ -41,12 +33,7 @@
public abstract class AbstractPollingIoConnector<T extends AbstractIoSession,
H>
extends AbstractIoConnector {
- private static final AtomicInteger id = new AtomicInteger();
-
private final Object lock = new Object();
- private final String threadName;
- private final Executor executor;
- private final boolean createdExecutor;
private final Queue<ConnectionRequest> connectQueue = new
ConcurrentLinkedQueue<ConnectionRequest>();
private final Queue<ConnectionRequest> cancelQueue = new
ConcurrentLinkedQueue<ConnectionRequest>();
private final IoProcessor<T> processor;
@@ -74,23 +61,12 @@
}
private AbstractPollingIoConnector(IoSessionConfig sessionConfig, Executor
executor, IoProcessor<T> processor, boolean createdProcessor) {
- super(sessionConfig);
+ super(sessionConfig, executor);
if (processor == null) {
throw new NullPointerException("processor");
}
- if (executor == null) {
- this.executor = new ThreadPoolExecutor(
- 1, 1, 1L, TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>());
- this.createdExecutor = true;
- } else {
- this.executor = executor;
- this.createdExecutor = false;
- }
-
- this.threadName = getClass().getSimpleName() + '-' +
id.incrementAndGet();
this.processor = processor;
this.createdProcessor = createdProcessor;
@@ -129,16 +105,8 @@
@Override
protected final IoFuture dispose0() throws Exception {
if (!disposalFuture.isDone()) {
- try {
- startupWorker();
- wakeup();
- } catch (RejectedExecutionException e) {
- if (createdExecutor) {
- // Ignore.
- } else {
- throw e;
- }
- }
+ startupWorker();
+ wakeup();
}
return disposalFuture;
}
@@ -192,7 +160,7 @@
synchronized (lock) {
if (worker == null) {
worker = new Worker();
- executor.execute(new NamePreservingRunnable(worker,
threadName));
+ executeWorker(worker);
}
}
}
@@ -338,9 +306,6 @@
ExceptionMonitor.getInstance().exceptionCaught(e);
} finally {
disposalFuture.setDone();
- if (createdExecutor) {
- ((ExecutorService) executor).shutdown();
- }
}
}
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java?rev=635907&r1=635906&r2=635907&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java Tue
Mar 11 05:58:01 2008
@@ -22,6 +22,7 @@
import java.net.SocketAddress;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.Executor;
/**
* A dummy [EMAIL PROTECTED] IoSession} for unit-testing or non-network-use of
@@ -77,6 +78,9 @@
new AbstractIoSessionConfig() {
@Override
protected void doSetAll(IoSessionConfig config) {}
+ },
+ new Executor() {
+ public void execute(Runnable command) {}
}) {
@Override
Modified:
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java?rev=635907&r1=635906&r2=635907&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java
Tue Mar 11 05:58:01 2008
@@ -26,6 +26,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Executor;
import org.apache.mina.common.AbstractIoAcceptor;
import org.apache.mina.common.IoFuture;
@@ -47,7 +48,14 @@
* Creates a new instance.
*/
public VmPipeAcceptor() {
- super(new DefaultVmPipeSessionConfig());
+ this(null);
+ }
+
+ /**
+ * Creates a new instance.
+ */
+ public VmPipeAcceptor(Executor executor) {
+ super(new DefaultVmPipeSessionConfig(), executor);
}
public TransportMetadata getTransportMetadata() {
Modified:
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java?rev=635907&r1=635906&r2=635907&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
Tue Mar 11 05:58:01 2008
@@ -23,6 +23,7 @@
import java.net.SocketAddress;
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.Executor;
import org.apache.mina.common.AbstractIoConnector;
import org.apache.mina.common.ConnectFuture;
@@ -49,7 +50,14 @@
* Creates a new instance.
*/
public VmPipeConnector() {
- super(new DefaultVmPipeSessionConfig());
+ this(null);
+ }
+
+ /**
+ * Creates a new instance.
+ */
+ public VmPipeConnector(Executor executor) {
+ super(new DefaultVmPipeSessionConfig(), executor);
}
public TransportMetadata getTransportMetadata() {