Author: dblevins
Date: Tue Aug 21 20:28:29 2012
New Revision: 1375764

URL: http://svn.apache.org/viewvc?rev=1375764&view=rev
Log:
TOMEE-398 - Unified Executor configuration options (@Asynchronous, @Timeout)

Added:
    
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/util/ExecutorBuilder.java
    
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/util/executor/
    
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/util/executor/OfferRejectedExecutionHandler.java
Modified:
    
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/async/AsynchronousPool.java
    
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/core/timer/DefaultTimerThreadPoolAdapter.java
    
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/core/timer/EjbTimerServiceImpl.java

Modified: 
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/async/AsynchronousPool.java
URL: 
http://svn.apache.org/viewvc/openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/async/AsynchronousPool.java?rev=1375764&r1=1375763&r2=1375764&view=diff
==============================================================================
--- 
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/async/AsynchronousPool.java
 (original)
+++ 
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/async/AsynchronousPool.java
 Tue Aug 21 20:28:29 2012
@@ -18,25 +18,19 @@ package org.apache.openejb.async;
 
 import org.apache.openejb.AppContext;
 import org.apache.openejb.core.ThreadContext;
-import org.apache.openejb.loader.Options;
 import org.apache.openejb.util.DaemonThreadFactory;
-import org.apache.openejb.util.Duration;
+import org.apache.openejb.util.ExecutorBuilder;
 
 import javax.ejb.EJBException;
 import javax.ejb.NoSuchEJBException;
 import java.rmi.NoSuchObjectException;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
-import java.util.concurrent.DelayQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -50,61 +44,19 @@ public class AsynchronousPool {
     private final BlockingQueue<Runnable> blockingQueue;
     private final ExecutorService executor;
 
-    public AsynchronousPool(String id, int corePoolSize, int maximumPoolSize, 
Duration keepAliveTime, BlockingQueue<Runnable> blockingQueue) {
-        blockingQueue = new LinkedBlockingQueue<Runnable>();
-        final TimeUnit unit = (keepAliveTime.getUnit() != null) ? 
keepAliveTime.getUnit() : TimeUnit.SECONDS;
-        this.blockingQueue = blockingQueue;
-        this.executor = new ThreadPoolExecutor(
-                corePoolSize,
-                maximumPoolSize,
-                keepAliveTime.getTime(),
-                unit, blockingQueue, new DaemonThreadFactory("@Asynchronous", 
id.trim()));
+    public AsynchronousPool(ThreadPoolExecutor threadPoolExecutor) {
+        this.blockingQueue = threadPoolExecutor.getQueue();
+        this.executor = threadPoolExecutor;
     }
 
     public static AsynchronousPool create(AppContext appContext) {
-        final Options options = appContext.getOptions();
 
-        final String id = appContext.getId();
-        final int corePoolSize = options.get("AsynchronousPool.CorePoolSize", 
10);
-        final int maximumPoolSize = 
Math.max(options.get("AsynchronousPool.MaximumPoolSize", 20), corePoolSize);
-        final Duration keepAliveTime = 
options.get("AsynchronousPool.KeepAliveTime", new Duration(60, 
TimeUnit.SECONDS));
-        final BlockingQueue queue = options.get("AsynchronousPool.QueueType", 
QueueType.LINKED).create(options);
+        final ExecutorBuilder builder = new ExecutorBuilder()
+                .prefix("AsynchronousPool")
+                .size(10)
+                .threadFactory(new DaemonThreadFactory("@Asynchronous", 
appContext.getId()));
 
-        return new AsynchronousPool(id, corePoolSize, maximumPoolSize, 
keepAliveTime, queue);
-    }
-
-    private static enum QueueType {
-        ARRAY,
-        DELAY,
-        LINKED,
-        PRIORITY,
-        SYNCHRONOUS;
-
-        public BlockingQueue create(Options options) {
-            switch (this) {
-                case ARRAY: {
-                    return new 
ArrayBlockingQueue(options.get("AsynchronousPool.QueueSize", 100));
-                }
-                case DELAY: {
-                    return new DelayQueue();
-                }
-                case LINKED: {
-                    return new 
LinkedBlockingQueue(options.get("AsynchronousPool.QueueSize", 
Integer.MAX_VALUE));
-                }
-                case PRIORITY: {
-                    return new PriorityBlockingQueue();
-                }
-                case SYNCHRONOUS: {
-                    return new 
SynchronousQueue(options.get("AsynchronousPool.QueueFair", false));
-                }
-                default: {
-                    // The Options class will throw an error if the user 
supplies an unknown enum string
-                    // The only way we can reach this is if we add a new 
QueueType element and forget to
-                    // implement it in the above switch statement.
-                    throw new IllegalArgumentException("Unknown QueueType 
type: " + this);
-                }
-            }
-        }
+        return new AsynchronousPool(builder.build(appContext.getOptions()));
     }
 
     public Object invoke(Callable<Object> callable, boolean isVoid) throws 
Throwable {

Modified: 
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/core/timer/DefaultTimerThreadPoolAdapter.java
URL: 
http://svn.apache.org/viewvc/openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/core/timer/DefaultTimerThreadPoolAdapter.java?rev=1375764&r1=1375763&r2=1375764&view=diff
==============================================================================
--- 
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/core/timer/DefaultTimerThreadPoolAdapter.java
 (original)
+++ 
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/core/timer/DefaultTimerThreadPoolAdapter.java
 Tue Aug 21 20:28:29 2012
@@ -17,19 +17,21 @@
 
 package org.apache.openejb.core.timer;
 
+import org.apache.openejb.loader.Options;
 import org.apache.openejb.loader.SystemInstance;
 import org.apache.openejb.util.DaemonThreadFactory;
+import org.apache.openejb.util.ExecutorBuilder;
 import org.apache.openejb.util.LogCategory;
 import org.apache.openejb.util.Logger;
+import org.apache.openejb.util.executor.OfferRejectedExecutionHandler;
 import org.quartz.SchedulerConfigException;
 import org.quartz.spi.ThreadPool;
 
 import java.util.concurrent.Executor;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
 /**
  * @version $Rev$ $Date$
@@ -62,41 +64,17 @@ public class DefaultTimerThreadPoolAdapt
     private final boolean threadPoolExecutorUsed;
 
     public DefaultTimerThreadPoolAdapter() {
-        this.executor = SystemInstance.get().getComponent(Executor.class);
+        final TimerExecutor timerExecutor = 
SystemInstance.get().getComponent(TimerExecutor.class);
 
-        if (this.executor == null) {
-
-            int size = 
Integer.parseInt(SystemInstance.get().getProperty(OPENEJB_TIMER_POOL_SIZE, 
"3"));
-            if (size < 3) {
-                size = 3;
-            }
-
-            this.executor = new ThreadPoolExecutor(size
-                    , size
-                    , 60L
-                    , TimeUnit.SECONDS
-                    , new LinkedBlockingQueue<Runnable>(size)
-                    , new 
DaemonThreadFactory(DefaultTimerThreadPoolAdapter.class)
-                    , new RejectedExecutionHandler() {
-                @Override
-                public void rejectedExecution(final Runnable r, final 
ThreadPoolExecutor tpe) {
-
-                    if (null == r || null == tpe || tpe.isShutdown() || 
tpe.isTerminated() || tpe.isTerminating()) {
-                        return;
-                    }
+        if (timerExecutor != null) {
+            this.executor = timerExecutor.executor;
+        } else {
+            this.executor = new ExecutorBuilder()
+                    .size(3)
+                    .prefix("EjbTimerPool")
+                    .build(SystemInstance.get().getOptions());
 
-                    try {
-                        if (!tpe.getQueue().offer(r, 30, TimeUnit.SECONDS)) {
-                            throw new RejectedExecutionException("Timeout 
waiting for executor slot");
-                        }
-                    } catch (InterruptedException e) {
-                        throw new RejectedExecutionException("Interrupted 
waiting for executor slot");
-                    }
-                }
-            }
-            );
-            ((ThreadPoolExecutor) this.executor).allowCoreThreadTimeOut(true);
-            SystemInstance.get().setComponent(Executor.class, this.executor);
+            SystemInstance.get().setComponent(TimerExecutor.class, new 
TimerExecutor(this.executor));
         }
 
         this.threadPoolExecutorUsed = (this.executor instanceof 
ThreadPoolExecutor);
@@ -106,6 +84,21 @@ public class DefaultTimerThreadPoolAdapt
         }
     }
 
+    // This is to prevent other parts of the code becoming dependent
+    // on the executor produced for EJB Timers
+    //
+    // If we want to share an Executor across the whole system
+    // for @Asynchronous and @Remote execution we should design
+    // that specifically and have it explicitly created somewhere
+    public final static class TimerExecutor {
+        private final Executor executor;
+
+        private TimerExecutor(Executor executor) {
+            if (executor == null) throw new IllegalArgumentException("executor 
cannot be null");
+            this.executor = executor;
+        }
+    }
+
     @Override
     public int blockForAvailableThreads() {
         if (this.threadPoolExecutorUsed) {
@@ -186,4 +179,5 @@ public class DefaultTimerThreadPoolAdapt
     public void setThreadPriority(final int threadPriority) {
         this.threadPriority = threadPriority;
     }
+
 }

Modified: 
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/core/timer/EjbTimerServiceImpl.java
URL: 
http://svn.apache.org/viewvc/openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/core/timer/EjbTimerServiceImpl.java?rev=1375764&r1=1375763&r2=1375764&view=diff
==============================================================================
--- 
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/core/timer/EjbTimerServiceImpl.java
 (original)
+++ 
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/core/timer/EjbTimerServiceImpl.java
 Tue Aug 21 20:28:29 2012
@@ -56,6 +56,7 @@ import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Date;
+import java.util.Map;
 import java.util.Properties;
 
 
@@ -63,7 +64,6 @@ public class EjbTimerServiceImpl impleme
     private static final long serialVersionUID = 1L;
     private static final Logger log = Logger.getInstance(LogCategory.TIMER, 
"org.apache.openejb.util.resources");
 
-    public static final String QUARTZ_THREAD_POOL_ADAPTER = 
"openejb.org.quartz.threadPool.class";
     public static final String QUARTZ_MAKE_SCHEDULER_THREAD_DAEMON = 
"org.quartz.scheduler.makeSchedulerThreadDaemon";
 
     public static final String OPENEJB_TIMEOUT_JOB_NAME = 
"OPENEJB_TIMEOUT_JOB";
@@ -130,26 +130,19 @@ public class EjbTimerServiceImpl impleme
         }
 
         final Properties properties = new Properties();
-        properties.putAll(SystemInstance.get().getProperties());
-        
properties.putAll(deployment.getModuleContext().getAppContext().getProperties());
-        properties.putAll(deployment.getModuleContext().getProperties());
-        properties.putAll(deployment.getProperties());
-
-        boolean newInstance = false;
-        for (String key : properties.stringPropertyNames()) {
-            if (key.startsWith("org.quartz.")) { // custom config -> don't use 
default scheduler
-                newInstance = true;
-                break;
-            }
-        }
+        putAll(properties, SystemInstance.get().getProperties());
+        putAll(properties, 
deployment.getModuleContext().getAppContext().getProperties());
+        putAll(properties, deployment.getModuleContext().getProperties());
+        putAll(properties, deployment.getProperties());
+
+        // custom config -> don't use default scheduler
+        boolean newInstance = properties.size() > 0;
 
         final SystemInstance systemInstance = SystemInstance.get();
 
         final String defaultThreadPool = 
DefaultTimerThreadPoolAdapter.class.getName();
         if 
(!properties.containsKey(StdSchedulerFactory.PROP_THREAD_POOL_CLASS)) {
-            properties.put(StdSchedulerFactory.PROP_THREAD_POOL_CLASS,
-                systemInstance.hasProperty(QUARTZ_THREAD_POOL_ADAPTER) ?
-                    
systemInstance.getOptions().get(QUARTZ_THREAD_POOL_ADAPTER, 
SimpleThreadPool.class.getName()) : defaultThreadPool);
+            properties.put(StdSchedulerFactory.PROP_THREAD_POOL_CLASS, 
defaultThreadPool);
         }
         if 
(!properties.containsKey(StdSchedulerFactory.PROP_SCHED_INSTANCE_NAME)) {
             properties.put(StdSchedulerFactory.PROP_SCHED_INSTANCE_NAME, 
"OpenEJB-TimerService-Scheduler");
@@ -206,6 +199,15 @@ public class EjbTimerServiceImpl impleme
         return thisScheduler;
     }
 
+    private static void putAll(Properties a, final Properties b) {
+        for (Map.Entry<Object, Object> entry : b.entrySet()) {
+            final String key = entry.getKey().toString();
+            if (key.startsWith("org.quartz.")) {
+                a.put(entry.getKey(), entry.getValue());
+            }
+        }
+    }
+
     public void shutdownMe() {
         cleanTimerData();
         shutdownMyScheduler();

Added: 
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/util/ExecutorBuilder.java
URL: 
http://svn.apache.org/viewvc/openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/util/ExecutorBuilder.java?rev=1375764&view=auto
==============================================================================
--- 
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/util/ExecutorBuilder.java
 (added)
+++ 
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/util/ExecutorBuilder.java
 Tue Aug 21 20:28:29 2012
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.openejb.util;
+
+import org.apache.openejb.loader.Options;
+import org.apache.openejb.util.executor.OfferRejectedExecutionHandler;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class ExecutorBuilder {
+
+    private int size = 10;
+    private String prefix = "Pool";
+    private ThreadFactory threadFactory;
+    private RejectedExecutionHandler rejectedExecutionHandler;
+
+    public ExecutorBuilder() {
+    }
+
+    public ExecutorBuilder size(int size) {
+        this.size = size;
+        return this;
+    }
+
+    public ExecutorBuilder prefix(String prefix) {
+        this.prefix = prefix;
+        return this;
+    }
+
+    public ExecutorBuilder threadFactory(ThreadFactory threadFactory) {
+        this.threadFactory = threadFactory;
+        return this;
+    }
+
+    public ExecutorBuilder rejectedExecutionHandler(RejectedExecutionHandler 
rejectedExecutionHandler) {
+        this.rejectedExecutionHandler = rejectedExecutionHandler;
+        return this;
+    }
+
+    public ThreadPoolExecutor build(Options options) {
+        final int corePoolSize = options.get(prefix + ".CorePoolSize", size);
+
+        // Default setting is for a fixed pool size, 
MaximumPoolSize==CorePoolSize
+        final int maximumPoolSize = Math.max(options.get(prefix + 
".MaximumPoolSize", corePoolSize), corePoolSize);
+
+        // Default QueueSize is bounded using the MaximumPoolSize
+        final int size = options.get(prefix + ".QueueSize", maximumPoolSize);
+
+        // Keep Threads inactive threads alive for 60 seconds by default
+        final Duration keepAliveTime = options.get(prefix + ".KeepAliveTime", 
new Duration(60, TimeUnit.SECONDS));
+
+        // All threads can be timed out by default
+        final boolean allowCoreThreadTimeout = options.get(prefix + 
".AllowCoreThreadTimeOut", true);
+
+        // If the user explicitly set the QueueSize to 0, we default QueueType 
to SYNCHRONOUS
+        final QueueType defaultQueueType = (size == 0) ? QueueType.SYNCHRONOUS 
: QueueType.LINKED;
+        final BlockingQueue queue = options.get(prefix + ".QueueType", 
defaultQueueType).create(options, prefix, size);
+
+        ThreadFactory factory = this.threadFactory;
+        if (factory == null) {
+            factory = new DaemonThreadFactory(prefix);
+        }
+
+        RejectedExecutionHandler handler = this.rejectedExecutionHandler;
+        if (handler == null) {
+            final Duration duration = options.get(prefix + ".OfferTimeout", 
new Duration(30, TimeUnit.SECONDS));
+            handler = new OfferRejectedExecutionHandler(duration);
+        }
+
+        final ThreadPoolExecutor threadPoolExecutor = new 
ThreadPoolExecutor(corePoolSize
+                , maximumPoolSize
+                , keepAliveTime.getTime()
+                , keepAliveTime.getUnit() != null ? keepAliveTime.getUnit() : 
TimeUnit.SECONDS
+                , queue
+                , factory
+                , handler
+        );
+
+        threadPoolExecutor.allowCoreThreadTimeOut(allowCoreThreadTimeout);
+
+        return threadPoolExecutor;
+    }
+
+    /**
+     * @version $Rev$ $Date$
+     */
+    public static enum QueueType {
+        ARRAY,
+        LINKED,
+        PRIORITY,
+        SYNCHRONOUS;
+
+        public BlockingQueue create(Options options, final String prefix, 
final int queueSize) {
+            switch (this) {
+                case ARRAY: {
+                    return new ArrayBlockingQueue(queueSize);
+                }
+                case LINKED: {
+                    return new LinkedBlockingQueue(queueSize);
+                }
+                case PRIORITY: {
+                    return new PriorityBlockingQueue();
+                }
+                case SYNCHRONOUS: {
+                    return new SynchronousQueue(options.get(prefix + 
".QueueFair", false));
+                }
+                default: {
+                    // The Options class will throw an error if the user 
supplies an unknown enum string
+                    // The only way we can reach this is if we add a new 
QueueType element and forget to
+                    // implement it in the above switch statement.
+                    throw new IllegalArgumentException("Unknown QueueType 
type: " + this);
+                }
+            }
+        }
+    }
+}

Added: 
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/util/executor/OfferRejectedExecutionHandler.java
URL: 
http://svn.apache.org/viewvc/openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/util/executor/OfferRejectedExecutionHandler.java?rev=1375764&view=auto
==============================================================================
--- 
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/util/executor/OfferRejectedExecutionHandler.java
 (added)
+++ 
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/util/executor/OfferRejectedExecutionHandler.java
 Tue Aug 21 20:28:29 2012
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.openejb.util.executor;
+
+import org.apache.openejb.util.Duration;
+
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class OfferRejectedExecutionHandler implements RejectedExecutionHandler 
{
+
+    private long timeout = 30;
+    private TimeUnit seconds = TimeUnit.SECONDS;
+
+    public OfferRejectedExecutionHandler(Duration duration) {
+        this(duration.getTime(), duration.getUnit() == null ? TimeUnit.SECONDS 
: duration.getUnit());
+    }
+
+    public OfferRejectedExecutionHandler(long timeout, TimeUnit timeUnit) {
+        if (timeout <= 0) throw new IllegalArgumentException("timeout must be 
greater than zero");
+        if (timeUnit == null) throw new IllegalArgumentException("TimeUnit 
must not be null");
+
+        this.timeout = timeout;
+        this.seconds = timeUnit;
+    }
+
+    @Override
+    public void rejectedExecution(final Runnable r, final ThreadPoolExecutor 
tpe) {
+
+        if (null == r || null == tpe || tpe.isShutdown() || tpe.isTerminated() 
|| tpe.isTerminating()) {
+            return;
+        }
+
+        try {
+            if (!tpe.getQueue().offer(r, timeout, seconds)) {
+                throw new RejectedExecutionException("Timeout waiting for 
executor slot: waited " + timeout + " " + seconds.toString().toLowerCase());
+            }
+        } catch (InterruptedException e) {
+            throw new RejectedExecutionException("Interrupted waiting for 
executor slot");
+        }
+    }
+}


Reply via email to