Author: trustin
Date: Wed Nov  7 21:35:34 2007
New Revision: 593015

URL: http://svn.apache.org/viewvc?rev=593015&view=rev
Log:
Related issue: DIRMINA-292 (Shared I/O processors.)
Forgot to check in SimpleIoProcessorPool

Added:
    
mina/trunk/core/src/main/java/org/apache/mina/common/SimpleIoProcessorPool.java 
  (with props)

Added: 
mina/trunk/core/src/main/java/org/apache/mina/common/SimpleIoProcessorPool.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/SimpleIoProcessorPool.java?rev=593015&view=auto
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/common/SimpleIoProcessorPool.java 
(added)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/common/SimpleIoProcessorPool.java 
Wed Nov  7 21:35:34 2007
@@ -0,0 +1,217 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.common;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An [EMAIL PROTECTED] IoProcessor} pool that distributes [EMAIL PROTECTED] 
IoSession}s into one or more
+ * [EMAIL PROTECTED] IoProcessor}s. Most current transport implementations use 
this pool internally
+ * to perform better in a multi-core environment, and therefore, you won't 
need to 
+ * use this pool directly unless you are running multiple [EMAIL PROTECTED] 
IoService}s in the
+ * same JVM.
+ * <p>
+ * If you are running multiple [EMAIL PROTECTED] IoService}s, you could want 
to share the pool
+ * among all services.  To do so, you can create a new [EMAIL PROTECTED] 
SimpleIoProcessorPool}
+ * instance by yourself and provide the pool as a constructor parameter when 
you
+ * create the services.
+ * <p>
+ * This pool uses Java reflection API to create multiple [EMAIL PROTECTED] 
IoProcessor} instances.
+ * It tries to instantiate the processor in the following order:
+ * <ol>
+ * <li>A public constructor with one [EMAIL PROTECTED] ExecutorService} 
parameter.</li>
+ * <li>A public constructor with one [EMAIL PROTECTED] Executor} 
parameter.</li>
+ * <li>A public default constructor</li>
+ * </ol>
+ * 
+ * @author The Apache MINA Project ([EMAIL PROTECTED])
+ * @version $Rev$, $Date$
+ * 
+ * @param <T> the type of the [EMAIL PROTECTED] IoSession} to be managed by 
the specified
+ *            [EMAIL PROTECTED] IoProcessor}.
+ */
+public class SimpleIoProcessorPool<T extends AbstractIoSession> implements 
IoProcessor<T> {
+    
+    private static final int DEFAULT_SIZE = 
Runtime.getRuntime().availableProcessors() + 1;
+    private static final AttributeKey PROCESSOR = new 
AttributeKey(SimpleIoProcessorPool.class, "processor");
+    
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    private final IoProcessor<T>[] pool;
+    private final AtomicInteger processorDistributor = new AtomicInteger();
+    private final Executor executor;
+    private final boolean createdExecutor;
+    private volatile boolean disposed;
+    
+    public SimpleIoProcessorPool(Class<? extends IoProcessor<T>> 
processorType) {
+        this(processorType, null, DEFAULT_SIZE);
+    }
+    
+    public SimpleIoProcessorPool(Class<? extends IoProcessor<T>> 
processorType, int size) {
+        this(processorType, null, size);
+    }
+
+    public SimpleIoProcessorPool(Class<? extends IoProcessor<T>> 
processorType, Executor executor) {
+        this(processorType, executor, DEFAULT_SIZE);
+    }
+    
+    @SuppressWarnings("unchecked")
+    public SimpleIoProcessorPool(Class<? extends IoProcessor<T>> 
processorType, Executor executor, int size) {
+        if (processorType == null) {
+            throw new NullPointerException("processorType");
+        }
+        if (size <= 0) {
+            throw new IllegalArgumentException(
+                    "size: " + size + " (expected: positive integer)");
+        }
+        
+        if (executor == null) {
+            this.executor = executor = Executors.newCachedThreadPool();
+            this.createdExecutor = true;
+        } else {
+            this.executor = executor;
+            this.createdExecutor = false;
+        }
+        
+        pool = new IoProcessor[size];
+        
+        boolean success = false;
+        try {
+            for (int i = 0; i < pool.length; i ++) {
+                IoProcessor<T> processor = null;
+                
+                // Try to create a new processor with a proper constructor.
+                try {
+                    try {
+                        processor = 
processorType.getConstructor(ExecutorService.class).newInstance(executor);
+                    } catch (NoSuchMethodException e) {
+                        // To the next step...
+                    }
+                    
+                    if (processor == null) {
+                        try {
+                            processor = 
processorType.getConstructor(Executor.class).newInstance(executor);
+                        } catch (NoSuchMethodException e) {
+                            // To the next step...
+                        }
+                    }
+                    
+                    if (processor == null) {
+                        try {
+                            processor = 
processorType.getConstructor().newInstance();
+                        } catch (NoSuchMethodException e) {
+                            // To the next step...
+                        }
+                    }
+                } catch (RuntimeException e) {
+                    throw e;
+                } catch (Exception e) {
+                    throw new RuntimeIoException(
+                            "Failed to create a new instance of " + 
processorType.getName(), e);
+                }
+                
+                // Raise an exception if no proper constructor is found.
+                if (processor == null) {
+                    throw new IllegalArgumentException(
+                            String.valueOf(processorType) + " must have a 
public constructor " +
+                            "with one " + 
ExecutorService.class.getSimpleName() + " parameter, " +
+                            "a public constructor with one " + 
Executor.class.getSimpleName() + 
+                            " parameter or a public default constructor.");
+                }
+                
+                pool[i] = processor;
+            }
+            
+            success = true;
+        } finally {
+            if (!success) {
+                dispose();
+            }
+        }
+    }
+    
+    public void add(T session) {
+        IoProcessor<T> p = nextProcessor();
+        session.setAttribute(PROCESSOR, p);
+        p.add(session);
+    }
+
+    @SuppressWarnings("unchecked")
+    public void flush(T session) {
+        IoProcessor<T> p = (IoProcessor<T>) session.getAttribute(PROCESSOR);
+        p.flush(session);
+    }
+
+    @SuppressWarnings("unchecked")
+    public void remove(T session) {
+        IoProcessor<T> p = (IoProcessor<T>) session.removeAttribute(PROCESSOR);
+        p.remove(session);
+    }
+
+    @SuppressWarnings("unchecked")
+    public void updateTrafficMask(T session) {
+        IoProcessor<T> p = (IoProcessor<T>) session.getAttribute(PROCESSOR);
+        p.updateTrafficMask(session);
+    }
+    
+    public void dispose() {
+        if (disposed) {
+            return;
+        }
+
+        disposed = true;
+        for (int i = pool.length - 1; i >= 0; i --) {
+            if (pool[i] == null) {
+                continue;
+            }
+
+            try {
+                pool[i].dispose();
+            } catch (Exception e) {
+                logger.warn(
+                        "Failed to dispose a " + 
pool[i].getClass().getSimpleName() +
+                        " at index " + i + ".", e);
+            } finally {
+                pool[i] = null;
+            }
+        }
+        
+        if (createdExecutor) {
+            ((ExecutorService) executor).shutdown();
+        }
+    }
+    
+    private IoProcessor<T> nextProcessor() {
+        checkDisposal();
+        return pool[Math.abs(processorDistributor.getAndIncrement()) % 
pool.length];
+    }
+
+    private void checkDisposal() {
+        if (disposed) {
+            throw new IllegalStateException("A disposed processor cannot be 
accessed.");
+        }
+    }
+}

Propchange: 
mina/trunk/core/src/main/java/org/apache/mina/common/SimpleIoProcessorPool.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
mina/trunk/core/src/main/java/org/apache/mina/common/SimpleIoProcessorPool.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date


Reply via email to