Author: trustin
Date: Tue Mar 11 05:58:01 2008
New Revision: 635907

URL: http://svn.apache.org/viewvc?rev=635907&view=rev
Log:
Moved up all executor management to AbstractIoService - hence removed a lot of 
code duplication


Modified:
    mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoAcceptor.java
    
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoConnector.java
    mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java
    
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingConnectionlessIoAcceptor.java
    
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoAcceptor.java
    
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoConnector.java
    mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java
    
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java
    
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoAcceptor.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoAcceptor.java?rev=635907&r1=635906&r2=635907&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoAcceptor.java 
(original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoAcceptor.java 
Tue Mar 11 05:58:01 2008
@@ -27,6 +27,7 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.Executor;
 
 
 /**
@@ -54,8 +55,8 @@
      */
     protected final Object bindLock = new Object();
 
-    protected AbstractIoAcceptor(IoSessionConfig sessionConfig) {
-        super(sessionConfig);
+    protected AbstractIoAcceptor(IoSessionConfig sessionConfig, Executor 
executor) {
+        super(sessionConfig, executor);
         defaultLocalAddresses.add(null);
     }
 

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoConnector.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoConnector.java?rev=635907&r1=635906&r2=635907&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoConnector.java 
(original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoConnector.java 
Tue Mar 11 05:58:01 2008
@@ -20,6 +20,7 @@
 package org.apache.mina.common;
 
 import java.net.SocketAddress;
+import java.util.concurrent.Executor;
 
 /**
  * A base implementation of [EMAIL PROTECTED] IoConnector}.
@@ -33,12 +34,11 @@
      * The minimum timeout value that is supported (in milliseconds).
      */
     private long connectTimeoutCheckInterval = 50L;
-    
-    private long connectTimeoutInMillis = 60*1000L; // 1 minute by default
+    private long connectTimeoutInMillis = 60 * 1000L; // 1 minute by default
     private SocketAddress defaultRemoteAddress;
 
-    protected AbstractIoConnector(IoSessionConfig sessionConfig) {
-        super(sessionConfig);
+    protected AbstractIoConnector(IoSessionConfig sessionConfig, Executor 
executor) {
+        super(sessionConfig, executor);
     }
 
     /**

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java?rev=635907&r1=635906&r2=635907&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java 
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java 
Tue Mar 11 05:58:01 2008
@@ -23,8 +23,14 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.mina.util.NamePreservingRunnable;
+
 
 /**
  * Base implementation of [EMAIL PROTECTED] IoService}s.
@@ -56,6 +62,8 @@
             public void sessionDestroyed(IoSession session) {}
     };
     
+    private static final AtomicInteger id = new AtomicInteger();
+
     /**
      * Current filter chain builder.
      */
@@ -73,7 +81,10 @@
      * Maintains the [EMAIL PROTECTED] IoServiceListener}s of this service.
      */
     private final IoServiceListenerSupport listeners;
-    
+
+    private final Executor executor;
+    private final String threadName;
+    private final boolean createdExecutor;
     private final Object disposalLock = new Object();
     private volatile boolean disposing;
     private volatile boolean disposed;
@@ -124,7 +135,7 @@
      */
     private IoSessionConfig sessionConfig;
 
-    protected AbstractIoService(IoSessionConfig sessionConfig) {
+    protected AbstractIoService(IoSessionConfig sessionConfig, Executor 
executor) {
         if (sessionConfig == null) {
             throw new NullPointerException("sessionConfig");
         }
@@ -143,6 +154,16 @@
         // Make JVM load the exception monitor before some transports
         // change the thread context class loader.
         ExceptionMonitor.getInstance();
+        
+        if (executor == null) {
+            this.executor = Executors.newCachedThreadPool();
+            this.createdExecutor = true;
+        } else {
+            this.executor = executor;
+            this.createdExecutor = false;
+        }
+        
+        this.threadName = getClass().getSimpleName() + '-' + 
id.incrementAndGet();
     }
 
     public final IoFilterChainBuilder getFilterChainBuilder() {
@@ -200,6 +221,10 @@
                 } catch (Exception e) {
                     ExceptionMonitor.getInstance().exceptionCaught(e);
                 } finally {
+                    if (createdExecutor) {
+                        ((ExecutorService) executor).shutdown();
+                    }
+                    
                     if (disposalFuture == null) {
                         disposed = true;
                     }
@@ -701,6 +726,18 @@
     
     protected final IoServiceListenerSupport getListeners() {
         return listeners;
+    }
+    
+    protected final void executeWorker(Runnable worker) {
+        executeWorker(worker, null);
+    }
+    
+    protected final void executeWorker(Runnable worker, String suffix) {
+        String actualThreadName = threadName;
+        if (suffix != null) {
+            actualThreadName = actualThreadName + '-' + suffix;
+        }
+        executor.execute(new NamePreservingRunnable(worker, actualThreadName));
     }
     
     // TODO Figure out make it work without causing a compiler error / warning.

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingConnectionlessIoAcceptor.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingConnectionlessIoAcceptor.java?rev=635907&r1=635906&r2=635907&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingConnectionlessIoAcceptor.java
 (original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingConnectionlessIoAcceptor.java
 Tue Mar 11 05:58:01 2008
@@ -30,14 +30,6 @@
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.mina.util.NamePreservingRunnable;
 
 /**
  * [EMAIL PROTECTED] IoAcceptor} for datagram transport (UDP/IP).
@@ -50,12 +42,7 @@
 
     private static final IoSessionRecycler DEFAULT_RECYCLER = new 
ExpiringSessionRecycler();
 
-    private static final AtomicInteger id = new AtomicInteger();
-
     private final Object lock = new Object();
-    private final Executor executor;
-    private final boolean createdExecutor;
-    private final String threadName;
     private final IoProcessor<T> processor = new 
ConnectionlessAcceptorProcessor();
     private final Queue<AcceptorOperationFuture> registerQueue = 
         new ConcurrentLinkedQueue<AcceptorOperationFuture>();
@@ -84,20 +71,8 @@
      * Creates a new instance.
      */
     protected AbstractPollingConnectionlessIoAcceptor(IoSessionConfig 
sessionConfig, Executor executor) {
-        super(sessionConfig);
+        super(sessionConfig, executor);
 
-        threadName = getClass().getSimpleName() + '-' + id.incrementAndGet();
-        
-        if (executor == null) {
-            this.executor = new ThreadPoolExecutor(
-                    1, 1, 1L, TimeUnit.SECONDS,
-                    new LinkedBlockingQueue<Runnable>());
-            this.createdExecutor = true;
-        } else {
-            this.executor = executor;
-            this.createdExecutor = false;
-        }
-        
         try {
             init();
             selectable = true;
@@ -135,16 +110,8 @@
     protected IoFuture dispose0() throws Exception {
         unbind();
         if (!disposalFuture.isDone()) {
-            try {
-                startupWorker();
-                wakeup();
-            } catch (RejectedExecutionException e) {
-                if (createdExecutor) {
-                    // Ignore.
-                } else {
-                    throw e;
-                }
-            }
+            startupWorker();
+            wakeup();
         }
         return disposalFuture;
     }
@@ -307,8 +274,7 @@
         synchronized (lock) {
             if (worker == null) {
                 worker = new Worker();
-                executor.execute(
-                        new NamePreservingRunnable(worker, threadName));
+                executeWorker(worker);
             }
         }
     }
@@ -368,9 +334,6 @@
                     ExceptionMonitor.getInstance().exceptionCaught(e);
                 } finally {
                     disposalFuture.setValue(true);
-                    if (createdExecutor) {
-                        ((ExecutorService) executor).shutdown();
-                    }
                 }
             }
         }

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoAcceptor.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoAcceptor.java?rev=635907&r1=635906&r2=635907&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoAcceptor.java
 (original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoAcceptor.java
 Tue Mar 11 05:58:01 2008
@@ -30,14 +30,6 @@
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.mina.util.NamePreservingRunnable;
 
 /**
  * @author The Apache MINA Project ([EMAIL PROTECTED])
@@ -46,11 +38,6 @@
 public abstract class AbstractPollingIoAcceptor<T extends AbstractIoSession, H>
         extends AbstractIoAcceptor {
 
-    private static final AtomicInteger id = new AtomicInteger();
-
-    private final Executor executor;
-    private final boolean createdExecutor;
-    private final String threadName;
     private final IoProcessor<T> processor;
     private final boolean createdProcessor;
 
@@ -89,23 +76,12 @@
     }
 
     private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor 
executor, IoProcessor<T> processor, boolean createdProcessor) {
-        super(sessionConfig);
+        super(sessionConfig, executor);
         
         if (processor == null) {
             throw new NullPointerException("processor");
         }
         
-        if (executor == null) {
-            this.executor = new ThreadPoolExecutor(
-                    1, 1, 1L, TimeUnit.SECONDS,
-                    new LinkedBlockingQueue<Runnable>());
-            this.createdExecutor = true;
-        } else {
-            this.executor = executor;
-            this.createdExecutor = false;
-        }
-
-        this.threadName = getClass().getSimpleName() + '-' + 
id.incrementAndGet();
         this.processor = processor;
         this.createdProcessor = createdProcessor;
         
@@ -141,16 +117,8 @@
     protected IoFuture dispose0() throws Exception {
         unbind();
         if (!disposalFuture.isDone()) {
-            try {
-                startupWorker();
-                wakeup();
-            } catch (RejectedExecutionException e) {
-                if (createdExecutor) {
-                    // Ignore.
-                } else {
-                    throw e;
-                }
-            }
+            startupWorker();
+            wakeup();
         }
         return disposalFuture;
     }
@@ -202,7 +170,7 @@
         synchronized (lock) {
             if (worker == null) {
                 worker = new Worker();
-                executor.execute(new NamePreservingRunnable(worker, 
threadName));
+                executeWorker(worker);
             }
         }
     }
@@ -279,9 +247,6 @@
                         ExceptionMonitor.getInstance().exceptionCaught(e);
                     } finally {
                         disposalFuture.setDone();
-                        if (createdExecutor) {
-                            ((ExecutorService) executor).shutdown();
-                        }
                     }
                 }
             }

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoConnector.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoConnector.java?rev=635907&r1=635906&r2=635907&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoConnector.java
 (original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoConnector.java
 Tue Mar 11 05:58:01 2008
@@ -25,14 +25,6 @@
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.mina.util.NamePreservingRunnable;
 
 /**
  * @author The Apache MINA Project ([EMAIL PROTECTED])
@@ -41,12 +33,7 @@
 public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, 
H>
         extends AbstractIoConnector {
 
-    private static final AtomicInteger id = new AtomicInteger();
-
     private final Object lock = new Object();
-    private final String threadName;
-    private final Executor executor;
-    private final boolean createdExecutor;
     private final Queue<ConnectionRequest> connectQueue = new 
ConcurrentLinkedQueue<ConnectionRequest>();
     private final Queue<ConnectionRequest> cancelQueue = new 
ConcurrentLinkedQueue<ConnectionRequest>();
     private final IoProcessor<T> processor;
@@ -74,23 +61,12 @@
     }
 
     private AbstractPollingIoConnector(IoSessionConfig sessionConfig, Executor 
executor, IoProcessor<T> processor, boolean createdProcessor) {
-        super(sessionConfig);
+        super(sessionConfig, executor);
         
         if (processor == null) {
             throw new NullPointerException("processor");
         }
         
-        if (executor == null) {
-            this.executor = new ThreadPoolExecutor(
-                    1, 1, 1L, TimeUnit.SECONDS,
-                    new LinkedBlockingQueue<Runnable>());
-            this.createdExecutor = true;
-        } else {
-            this.executor = executor;
-            this.createdExecutor = false;
-        }
-
-        this.threadName = getClass().getSimpleName() + '-' + 
id.incrementAndGet();
         this.processor = processor;
         this.createdProcessor = createdProcessor;
 
@@ -129,16 +105,8 @@
     @Override
     protected final IoFuture dispose0() throws Exception {
         if (!disposalFuture.isDone()) {
-            try {
-                startupWorker();
-                wakeup();
-            } catch (RejectedExecutionException e) {
-                if (createdExecutor) {
-                    // Ignore.
-                } else {
-                    throw e;
-                }
-            }
+            startupWorker();
+            wakeup();
         }
         return disposalFuture;
     }
@@ -192,7 +160,7 @@
         synchronized (lock) {
             if (worker == null) {
                 worker = new Worker();
-                executor.execute(new NamePreservingRunnable(worker, 
threadName));
+                executeWorker(worker);
             }
         }
     }
@@ -338,9 +306,6 @@
                         ExceptionMonitor.getInstance().exceptionCaught(e);
                     } finally {
                         disposalFuture.setDone();
-                        if (createdExecutor) {
-                            ((ExecutorService) executor).shutdown();
-                        }
                     }
                 }
             }

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java?rev=635907&r1=635906&r2=635907&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java 
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java Tue 
Mar 11 05:58:01 2008
@@ -22,6 +22,7 @@
 import java.net.SocketAddress;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.Executor;
 
 /**
  * A dummy [EMAIL PROTECTED] IoSession} for unit-testing or non-network-use of
@@ -77,6 +78,9 @@
                 new AbstractIoSessionConfig() {
                     @Override
                     protected void doSetAll(IoSessionConfig config) {}
+                },
+                new Executor() {
+                    public void execute(Runnable command) {}
                 }) {
 
             @Override

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java?rev=635907&r1=635906&r2=635907&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java
 (original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java
 Tue Mar 11 05:58:01 2008
@@ -26,6 +26,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Executor;
 
 import org.apache.mina.common.AbstractIoAcceptor;
 import org.apache.mina.common.IoFuture;
@@ -47,7 +48,14 @@
      * Creates a new instance.
      */
     public VmPipeAcceptor() {
-        super(new DefaultVmPipeSessionConfig());
+        this(null);
+    }
+    
+    /**
+     * Creates a new instance.
+     */
+    public VmPipeAcceptor(Executor executor) {
+        super(new DefaultVmPipeSessionConfig(), executor);
     }
 
     public TransportMetadata getTransportMetadata() {

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java?rev=635907&r1=635906&r2=635907&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
 (original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
 Tue Mar 11 05:58:01 2008
@@ -23,6 +23,7 @@
 import java.net.SocketAddress;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.Executor;
 
 import org.apache.mina.common.AbstractIoConnector;
 import org.apache.mina.common.ConnectFuture;
@@ -49,7 +50,14 @@
      * Creates a new instance.
      */
     public VmPipeConnector() {
-        super(new DefaultVmPipeSessionConfig());
+        this(null);
+    }
+    
+    /**
+     * Creates a new instance.
+     */
+    public VmPipeConnector(Executor executor) {
+        super(new DefaultVmPipeSessionConfig(), executor);
     }
 
     public TransportMetadata getTransportMetadata() {


Reply via email to