Author: trustin
Date: Thu Nov  8 20:37:42 2007
New Revision: 593420

URL: http://svn.apache.org/viewvc?rev=593420&view=rev
Log:
* Added IoEventQueueHandler 
* OrderedThreadPoolExecutor invokes IoEventQueueHandler on any queue events now.

Added:
    
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueHandler.java
   (with props)
Modified:
    
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java
    
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java?rev=593420&r1=593419&r2=593420&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java
 (original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java
 Thu Nov  8 20:37:42 2007
@@ -26,10 +26,8 @@
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ThreadPoolExecutor.AbortPolicy;
 
 import org.apache.mina.common.IdleStatus;
 import org.apache.mina.common.IoEventType;
@@ -103,6 +101,10 @@
  * chain.addLast("executor2", new ExecutorFilter(IoEventType.WRITE));
  * </code></pre>
  * 
+ * <h2>Preventing [EMAIL PROTECTED] OutOfMemoryError}</h2>
+ * 
+ * Please refer to [EMAIL PROTECTED] OrderedThreadPoolExecutor} and [EMAIL 
PROTECTED] IoEventQueueHandler}.
+ * 
  * @author The Apache MINA Project ([EMAIL PROTECTED])
  * @version $Rev$, $Date$
  */
@@ -152,8 +154,8 @@
     public ExecutorFilter(
             int corePoolSize, int maximumPoolSize, 
             long keepAliveTime, TimeUnit unit,
-            RejectedExecutionHandler handler) {
-        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
Executors.defaultThreadFactory(), handler, (IoEventType[]) null);
+            IoEventQueueHandler queueHandler) {
+        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
Executors.defaultThreadFactory(), queueHandler, (IoEventType[]) null);
     }
 
     /**
@@ -164,7 +166,7 @@
             int corePoolSize, int maximumPoolSize, 
             long keepAliveTime, TimeUnit unit,
             ThreadFactory threadFactory) {
-        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
threadFactory, new AbortPolicy(), (IoEventType[]) null);
+        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
threadFactory, null, (IoEventType[]) null);
     }
 
     /**
@@ -174,8 +176,8 @@
     public ExecutorFilter(
             int corePoolSize, int maximumPoolSize, 
             long keepAliveTime, TimeUnit unit,
-            ThreadFactory threadFactory, RejectedExecutionHandler handler) {
-        this(new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, 
keepAliveTime, unit, threadFactory, handler), true, (IoEventType[]) null);
+            ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
+        this(new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, 
keepAliveTime, unit, threadFactory, queueHandler), true, (IoEventType[]) null);
     }
 
     /**
@@ -218,8 +220,8 @@
     public ExecutorFilter(
             int corePoolSize, int maximumPoolSize, 
             long keepAliveTime, TimeUnit unit,
-            RejectedExecutionHandler handler, IoEventType... eventTypes) {
-        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
Executors.defaultThreadFactory(), handler, eventTypes);
+            IoEventQueueHandler queueHandler, IoEventType... eventTypes) {
+        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
Executors.defaultThreadFactory(), queueHandler, eventTypes);
     }
 
     /**
@@ -230,7 +232,7 @@
             int corePoolSize, int maximumPoolSize, 
             long keepAliveTime, TimeUnit unit,
             ThreadFactory threadFactory, IoEventType... eventTypes) {
-        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
threadFactory, new AbortPolicy(), eventTypes);
+        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
threadFactory, null, eventTypes);
     }
 
     /**
@@ -240,8 +242,8 @@
     public ExecutorFilter(
             int corePoolSize, int maximumPoolSize, 
             long keepAliveTime, TimeUnit unit,
-            ThreadFactory threadFactory, RejectedExecutionHandler handler, 
IoEventType... eventTypes) {
-        this(new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, 
keepAliveTime, unit, threadFactory, handler), true, eventTypes);
+            ThreadFactory threadFactory, IoEventQueueHandler queueHandler, 
IoEventType... eventTypes) {
+        this(new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, 
keepAliveTime, unit, threadFactory, queueHandler), true, eventTypes);
     }
     
     /**

Added: 
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueHandler.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueHandler.java?rev=593420&view=auto
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueHandler.java
 (added)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueHandler.java
 Thu Nov  8 20:37:42 2007
@@ -0,0 +1,51 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.filter.executor;
+
+import java.util.EventListener;
+
+import org.apache.mina.common.IoEvent;
+
+/**
+ * Listens to all event queue operations occurring in [EMAIL PROTECTED] 
OrderedThreadPoolExecutor}.
+ * 
+ * @author The Apache MINA Project ([EMAIL PROTECTED])
+ * @version $Rev$, $Date$
+ */
+public interface IoEventQueueHandler extends EventListener {
+    /**
+     * Returns <tt>true</tt> if and only if the specified <tt>event</tt> is
+     * allowed to be offered to the event queue.  The <tt>event</tt> is dropped
+     * if <tt>false</tt> is returned.
+     */
+    boolean accept(OrderedThreadPoolExecutor executor, IoEvent event);
+    
+    /**
+     * Invoked after the specified <tt>event</tt> has been offered to the
+     * event queue.
+     */
+    void offered(OrderedThreadPoolExecutor executor, IoEvent event);
+    
+    /**
+     * Invoked after the specified <tt>event</tt> has been polled from the
+     * event queue.
+     */
+    void polled(OrderedThreadPoolExecutor executor, IoEvent event);
+}

Propchange: 
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueHandler.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java?rev=593420&r1=593419&r2=593420&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java
 (original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java
 Thu Nov  8 20:37:42 2007
@@ -49,8 +49,15 @@
 public class OrderedThreadPoolExecutor extends ThreadPoolExecutor {
 
     private static final IoSession EXIT_SIGNAL = new DummySession();
+    private static final IoEventQueueHandler NOOP_QUEUE_MONITOR = new 
IoEventQueueHandler() {
+        public boolean accept(OrderedThreadPoolExecutor executor, IoEvent 
event) {
+            return true;
+        }
+        public void offered(OrderedThreadPoolExecutor executor, IoEvent event) 
{}
+        public void polled(OrderedThreadPoolExecutor executor, IoEvent event) 
{}
+    };
 
-    private final AttributeKey BUFFER = new AttributeKey(getClass(), "queue");
+    private final AttributeKey BUFFER = new AttributeKey(getClass(), "buffer");
     private final BlockingQueue<IoSession> waitingSessions = new 
LinkedBlockingQueue<IoSession>();
     
     private final Set<Worker> workers = new HashSet<Worker>();
@@ -61,9 +68,10 @@
     private final AtomicInteger idleWorkers = new AtomicInteger();
     
     private long completedTaskCount;
-    
     private volatile boolean shutdown;
     
+    private volatile IoEventQueueHandler queueHandler;
+    
     public OrderedThreadPoolExecutor(int maximumPoolSize) {
         this(0, maximumPoolSize);
     }
@@ -80,22 +88,22 @@
     public OrderedThreadPoolExecutor(
             int corePoolSize, int maximumPoolSize, 
             long keepAliveTime, TimeUnit unit,
-            RejectedExecutionHandler handler) {
-        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
Executors.defaultThreadFactory(), handler);
+            IoEventQueueHandler queueMonitor) {
+        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
Executors.defaultThreadFactory(), queueMonitor);
     }
 
     public OrderedThreadPoolExecutor(
             int corePoolSize, int maximumPoolSize, 
             long keepAliveTime, TimeUnit unit,
             ThreadFactory threadFactory) {
-        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
threadFactory, new AbortPolicy());
+        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
threadFactory, null);
     }
 
     public OrderedThreadPoolExecutor(
             int corePoolSize, int maximumPoolSize, 
             long keepAliveTime, TimeUnit unit,
-            ThreadFactory threadFactory, RejectedExecutionHandler handler) {
-        super(0, 1, keepAliveTime, unit, new SynchronousQueue<Runnable>(), 
threadFactory, handler);
+            ThreadFactory threadFactory, IoEventQueueHandler queueMonitor) {
+        super(0, 1, keepAliveTime, unit, new SynchronousQueue<Runnable>(), 
threadFactory, new AbortPolicy());
         if (corePoolSize < 0) {
             throw new IllegalArgumentException("corePoolSize: " + 
corePoolSize);
         }
@@ -106,8 +114,25 @@
         
         this.corePoolSize = corePoolSize;
         this.maximumPoolSize = maximumPoolSize;
+        setQueueHandler(queueMonitor);
     }
     
+    public IoEventQueueHandler getQueueHandler() {
+        return queueHandler;
+    }
+
+    public void setQueueHandler(IoEventQueueHandler queueHandler) {
+        if (queueHandler == null) {
+            queueHandler = NOOP_QUEUE_MONITOR;
+        }
+        this.queueHandler = queueHandler;
+    }
+
+    @Override
+    public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
+        // Ignore the request.  It must always be AbortPolicy.
+    }
+
     private void addWorker() {
         synchronized (workers) {
             if (workers.size() >= maximumPoolSize) {
@@ -253,10 +278,15 @@
         Queue<Runnable> queue = buf.queue;
         boolean offer;
         synchronized (queue) {
-            queue.offer(e);
-            if (buf.processingCompleted) {
-                buf.processingCompleted = false;
-                offer = true;
+            if (queueHandler.accept(this, e)) {
+                queue.offer(e);
+                queueHandler.offered(this, e);
+                if (buf.processingCompleted) {
+                    buf.processingCompleted = false;
+                    offer = true;
+                } else {
+                    offer = false;
+                }
             } else {
                 offer = false;
             }
@@ -345,7 +375,12 @@
             }
         }
     }
-
+    
+    @Override
+    public BlockingQueue<Runnable> getQueue() {
+        throw new UnsupportedOperationException("Please use getQueue(Runnable) 
instead.");
+    }
+    
     @Override
     public void purge() {
     }
@@ -482,6 +517,8 @@
                         break;
                     }
                 }
+
+                queueHandler.polled(OrderedThreadPoolExecutor.this, (IoEvent) 
task);
 
                 runTask(task);
             }


Reply via email to