This is an automated email from the ASF dual-hosted git repository.

slfan1989 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 503e803b285 HDFS-17128. Updating SQLDelegationTokenSecretManager to 
use LoadingCa… (#5963)
503e803b285 is described below

commit 503e803b285d4d494eba3a4b0d12e235e5de08e5
Author: hchaverri <55413673+hchave...@users.noreply.github.com>
AuthorDate: Sat Aug 19 19:12:45 2023 -0700

    HDFS-17128. Updating SQLDelegationTokenSecretManager to use LoadingCa… 
(#5963)
---
 .../AbstractDelegationTokenSecretManager.java      |  12 ++-
 .../delegation/DelegationTokenLoadingCache.java    | 119 ++++++++++++++++++++
 .../SQLDelegationTokenSecretManager.java           |  96 ++++++++++++-----
 .../TestSQLDelegationTokenSecretManagerImpl.java   | 120 ++++++++++++++++++++-
 4 files changed, 312 insertions(+), 35 deletions(-)

diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
index 61c3312c107..bc0ef624d5a 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
@@ -82,9 +82,8 @@ public abstract class 
AbstractDelegationTokenSecretManager<TokenIdent
    * Cache of currently valid tokens, mapping from DelegationTokenIdentifier 
    * to DelegationTokenInformation. Protected by this object lock.
    */
-  protected final Map<TokenIdent, DelegationTokenInformation> currentTokens 
-      = new ConcurrentHashMap<>();
-  
+  protected Map<TokenIdent, DelegationTokenInformation> currentTokens;
+
   /**
    * Sequence number to create DelegationTokenIdentifier.
    * Protected by this object lock.
@@ -143,6 +142,7 @@ public abstract class 
AbstractDelegationTokenSecretManager<TokenIdent
     this.tokenRenewInterval = delegationTokenRenewInterval;
     this.tokenRemoverScanInterval = delegationTokenRemoverScanInterval;
     this.storeTokenTrackingId = false;
+    this.currentTokens = new ConcurrentHashMap<>();
   }
 
   /**
@@ -757,10 +757,14 @@ public abstract class 
AbstractDelegationTokenSecretManager<TokenIdent
     for (TokenIdent ident : expiredTokens) {
       logExpireToken(ident);
       LOG.info("Removing expired token " + formatTokenId(ident));
-      removeStoredToken(ident);
+      removeExpiredStoredToken(ident);
     }
   }
 
+  protected void removeExpiredStoredToken(TokenIdent ident) throws IOException 
{
+    removeStoredToken(ident);
+  }
+
   public void stopThreads() {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Stopping expired delegation token remover thread");
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationTokenLoadingCache.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationTokenLoadingCache.java
new file mode 100644
index 00000000000..2251f98c143
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationTokenLoadingCache.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security.token.delegation;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder;
+import org.apache.hadoop.thirdparty.com.google.common.cache.CacheLoader;
+import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache;
+
+
+/**
+ * Cache for delegation tokens that can handle high volume of tokens. A
+ * loading cache will prevent all active tokens from being in memory at the
+ * same time. It will also trigger more requests from the persistent token 
storage.
+ */
+public class DelegationTokenLoadingCache<K, V> implements Map<K, V> {
+  private LoadingCache<K, V> internalLoadingCache;
+
+  public DelegationTokenLoadingCache(long cacheExpirationMs, long 
maximumCacheSize,
+      Function<K, V> singleEntryFunction) {
+    this.internalLoadingCache = CacheBuilder.newBuilder()
+        .expireAfterWrite(cacheExpirationMs, TimeUnit.MILLISECONDS)
+        .maximumSize(maximumCacheSize)
+        .build(new CacheLoader<K, V>() {
+          @Override
+          public V load(K k) throws Exception {
+            return singleEntryFunction.apply(k);
+          }
+        });
+  }
+
+  @Override
+  public int size() {
+    return (int) this.internalLoadingCache.size();
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return size() == 0;
+  }
+
+  @Override
+  public boolean containsKey(Object key) {
+    return this.internalLoadingCache.getIfPresent(key) != null;
+  }
+
+  @Override
+  public boolean containsValue(Object value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public V get(Object key) {
+    try {
+      return this.internalLoadingCache.get((K) key);
+    } catch (Exception e) {
+      return null;
+    }
+  }
+
+  @Override
+  public V put(K key, V value) {
+    this.internalLoadingCache.put(key, value);
+    return this.internalLoadingCache.getIfPresent(key);
+  }
+
+  @Override
+  public V remove(Object key) {
+    V value = this.internalLoadingCache.getIfPresent(key);
+    this.internalLoadingCache.invalidate(key);
+    return value;
+  }
+
+  @Override
+  public void putAll(Map<? extends K, ? extends V> m) {
+    this.internalLoadingCache.putAll(m);
+  }
+
+  @Override
+  public void clear() {
+    this.internalLoadingCache.invalidateAll();
+  }
+
+  @Override
+  public Set<K> keySet() {
+    return this.internalLoadingCache.asMap().keySet();
+  }
+
+  @Override
+  public Collection<V> values() {
+    return this.internalLoadingCache.asMap().values();
+  }
+
+  @Override
+  public Set<Entry<K, V>> entrySet() {
+    return this.internalLoadingCache.asMap().entrySet();
+  }
+}
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/SQLDelegationTokenSecretManager.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/SQLDelegationTokenSecretManager.java
index 4b6ae21d7a9..75f00d3f924 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/SQLDelegationTokenSecretManager.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/SQLDelegationTokenSecretManager.java
@@ -24,9 +24,13 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.sql.SQLException;
+import java.util.NoSuchElementException;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
+import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,6 +50,13 @@ public abstract class 
SQLDelegationTokenSecretManager<TokenIdent
   private static final String SQL_DTSM_TOKEN_SEQNUM_BATCH_SIZE = 
SQL_DTSM_CONF_PREFIX
       + "token.seqnum.batch.size";
   public static final int DEFAULT_SEQ_NUM_BATCH_SIZE = 10;
+  public static final String SQL_DTSM_TOKEN_LOADING_CACHE_EXPIRATION = 
SQL_DTSM_CONF_PREFIX
+      + "token.loading.cache.expiration";
+  public static final long SQL_DTSM_TOKEN_LOADING_CACHE_EXPIRATION_DEFAULT =
+      TimeUnit.SECONDS.toMillis(10);
+  public static final String SQL_DTSM_TOKEN_LOADING_CACHE_MAX_SIZE = 
SQL_DTSM_CONF_PREFIX
+      + "token.loading.cache.max.size";
+  public static final long SQL_DTSM_TOKEN_LOADING_CACHE_MAX_SIZE_DEFAULT = 
100000;
 
   // Batch of sequence numbers that will be requested by the 
sequenceNumCounter.
   // A new batch is requested once the sequenceNums available to a secret 
manager are
@@ -71,6 +82,13 @@ public abstract class 
SQLDelegationTokenSecretManager<TokenIdent
 
     this.seqNumBatchSize = conf.getInt(SQL_DTSM_TOKEN_SEQNUM_BATCH_SIZE,
         DEFAULT_SEQ_NUM_BATCH_SIZE);
+
+    long cacheExpirationMs = 
conf.getTimeDuration(SQL_DTSM_TOKEN_LOADING_CACHE_EXPIRATION,
+        SQL_DTSM_TOKEN_LOADING_CACHE_EXPIRATION_DEFAULT, 
TimeUnit.MILLISECONDS);
+    long maximumCacheSize = conf.getLong(SQL_DTSM_TOKEN_LOADING_CACHE_MAX_SIZE,
+        SQL_DTSM_TOKEN_LOADING_CACHE_MAX_SIZE_DEFAULT);
+    this.currentTokens = new DelegationTokenLoadingCache<>(cacheExpirationMs, 
maximumCacheSize,
+        this::getTokenInfoFromSQL);
   }
 
   /**
@@ -126,15 +144,11 @@ public abstract class 
SQLDelegationTokenSecretManager<TokenIdent
   @Override
   public synchronized TokenIdent cancelToken(Token<TokenIdent> token,
       String canceller) throws IOException {
-    try (ByteArrayInputStream bis = new 
ByteArrayInputStream(token.getIdentifier());
-        DataInputStream din = new DataInputStream(bis)) {
-      TokenIdent id = createIdentifier();
-      id.readFields(din);
+    TokenIdent id = createTokenIdent(token.getIdentifier());
 
-      // Calling getTokenInfo to load token into local cache if not present.
-      // super.cancelToken() requires token to be present in local cache.
-      getTokenInfo(id);
-    }
+    // Calling getTokenInfo to load token into local cache if not present.
+    // super.cancelToken() requires token to be present in local cache.
+    getTokenInfo(id);
 
     return super.cancelToken(token, canceller);
   }
@@ -153,6 +167,24 @@ public abstract class 
SQLDelegationTokenSecretManager<TokenIdent
     }
   }
 
+  @Override
+  protected void removeExpiredStoredToken(TokenIdent ident) {
+    try {
+      // Ensure that the token has not been renewed in SQL by
+      // another secret manager
+      DelegationTokenInformation tokenInfo = getTokenInfoFromSQL(ident);
+      if (tokenInfo.getRenewDate() >= Time.now()) {
+        LOG.info("Token was renewed by a different router and has not been 
deleted: {}", ident);
+        return;
+      }
+      removeStoredToken(ident);
+    } catch (NoSuchElementException e) {
+      LOG.info("Token has already been deleted by a different router: {}", 
ident);
+    } catch (Exception e) {
+      LOG.warn("Could not remove token {}", ident, e);
+    }
+  }
+
   /**
    * Obtains the DelegationTokenInformation associated with the given
    * TokenIdentifier in the SQL database.
@@ -160,29 +192,35 @@ public abstract class 
SQLDelegationTokenSecretManager<TokenIdent
    * @return DelegationTokenInformation that matches the given TokenIdentifier 
or
    *         null if it doesn't exist in the database.
    */
-  @Override
-  protected DelegationTokenInformation getTokenInfo(TokenIdent ident) {
-    // Look for token in local cache
-    DelegationTokenInformation tokenInfo = super.getTokenInfo(ident);
-
-    if (tokenInfo == null) {
-      try {
-        // Look for token in SQL database
-        byte[] tokenInfoBytes = selectTokenInfo(ident.getSequenceNumber(), 
ident.getBytes());
+  @VisibleForTesting
+  protected DelegationTokenInformation getTokenInfoFromSQL(TokenIdent ident) {
+    try {
+      byte[] tokenInfoBytes = selectTokenInfo(ident.getSequenceNumber(), 
ident.getBytes());
+      if (tokenInfoBytes == null) {
+        // Throw exception so value is not added to cache
+        throw new NoSuchElementException("Token not found in SQL secret 
manager: " + ident);
+      }
+      return createTokenInfo(tokenInfoBytes);
+    } catch (SQLException | IOException e) {
+      LOG.error("Failed to get token in SQL secret manager", e);
+      throw new RuntimeException(e);
+    }
+  }
 
-        if (tokenInfoBytes != null) {
-          tokenInfo = new DelegationTokenInformation();
-          try (ByteArrayInputStream bis = new 
ByteArrayInputStream(tokenInfoBytes)) {
-            try (DataInputStream dis = new DataInputStream(bis)) {
-              tokenInfo.readFields(dis);
-            }
-          }
+  private TokenIdent createTokenIdent(byte[] tokenIdentBytes) throws 
IOException {
+    try (ByteArrayInputStream bis = new ByteArrayInputStream(tokenIdentBytes);
+        DataInputStream din = new DataInputStream(bis)) {
+      TokenIdent id = createIdentifier();
+      id.readFields(din);
+      return id;
+    }
+  }
 
-          // Update token in local cache
-          currentTokens.put(ident, tokenInfo);
-        }
-      } catch (IOException | SQLException e) {
-        LOG.error("Failed to get token in SQL secret manager", e);
+  private DelegationTokenInformation createTokenInfo(byte[] tokenInfoBytes) 
throws IOException {
+    DelegationTokenInformation tokenInfo = new DelegationTokenInformation();
+    try (ByteArrayInputStream bis = new ByteArrayInputStream(tokenInfoBytes)) {
+      try (DataInputStream dis = new DataInputStream(bis)) {
+        tokenInfo.readFields(dis);
       }
     }
 
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/security/token/TestSQLDelegationTokenSecretManagerImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/security/token/TestSQLDelegationTokenSecretManagerImpl.java
index 569a274042b..5165f7fd770 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/security/token/TestSQLDelegationTokenSecretManagerImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/security/token/TestSQLDelegationTokenSecretManagerImpl.java
@@ -25,13 +25,21 @@ import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+import 
org.apache.hadoop.security.token.delegation.SQLDelegationTokenSecretManager;
 import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.hadoop.util.Time;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -43,6 +51,7 @@ import org.junit.Test;
 public class TestSQLDelegationTokenSecretManagerImpl {
   private static final String CONNECTION_URL = "jdbc:derby:memory:TokenStore";
   private static final int TEST_MAX_RETRIES = 3;
+  private static final int TOKEN_EXPIRATION_SECONDS = 1;
   private static Configuration conf;
 
   @Before
@@ -111,6 +120,98 @@ public class TestSQLDelegationTokenSecretManagerImpl {
     }
   }
 
+  @Test
+  public void testCancelToken() throws Exception {
+    DelegationTokenManager tokenManager1 = 
createTokenManager(getShortLivedTokenConf());
+    DelegationTokenManager tokenManager2 = 
createTokenManager(getShortLivedTokenConf());
+
+    TestDelegationTokenSecretManager secretManager2 =
+        (TestDelegationTokenSecretManager) 
tokenManager2.getDelegationTokenSecretManager();
+
+    try {
+      // Create token on token manager 1
+      Token<? extends AbstractDelegationTokenIdentifier> token1 =
+          tokenManager1.createToken(UserGroupInformation.getCurrentUser(), 
"foo");
+
+      // Load token on token manager 2 to test it doesn't get stale
+      tokenManager2.verifyToken(token1);
+
+      // Cancel token on token manager 1
+      tokenManager1.cancelToken(token1, "foo");
+
+      // Validate that token cancellation is propagated to token manager 2
+      secretManager2.waitForTokenEviction(token1.decodeIdentifier());
+      LambdaTestUtils.intercept(SecretManager.InvalidToken.class,
+          () -> tokenManager2.verifyToken(token1));
+    } finally {
+      stopTokenManager(tokenManager1);
+      stopTokenManager(tokenManager2);
+    }
+  }
+
+  @Test
+  public void testRenewToken() throws Exception {
+    DelegationTokenManager tokenManager1 = 
createTokenManager(getShortLivedTokenConf());
+    DelegationTokenManager tokenManager2 = 
createTokenManager(getShortLivedTokenConf());
+
+    TestDelegationTokenSecretManager secretManager2 =
+        (TestDelegationTokenSecretManager) 
tokenManager2.getDelegationTokenSecretManager();
+
+    try {
+      // Create token on token manager 1
+      Token<? extends AbstractDelegationTokenIdentifier> token1 =
+          tokenManager1.createToken(UserGroupInformation.getCurrentUser(), 
"foo");
+      long expirationTime = Time.monotonicNow() +
+          TimeUnit.SECONDS.toMillis(TOKEN_EXPIRATION_SECONDS) * 2;
+
+      // Load token on token manager 2 to test it doesn't get stale
+      tokenManager2.verifyToken(token1);
+
+      // Renew token on token manager 1 and verify token is updated on token 
manager 2
+      // Do this for long enough that the token should be expired if not 
renewed
+      AbstractDelegationTokenIdentifier token1Id =
+          (AbstractDelegationTokenIdentifier) token1.decodeIdentifier();
+      while (Time.monotonicNow() < expirationTime) {
+        tokenManager1.renewToken(token1, "foo");
+        callRemoveExpiredTokensAndValidateSQL(secretManager2, token1Id, true);
+        secretManager2.waitForTokenEviction(token1Id);
+        tokenManager2.verifyToken(token1);
+      }
+
+      // Stop renewing token and validate it's no longer valid and removed
+      // from SQL
+      Thread.sleep(TimeUnit.SECONDS.toMillis(TOKEN_EXPIRATION_SECONDS) * 2);
+      LambdaTestUtils.intercept(SecretManager.InvalidToken.class,
+          () -> tokenManager2.verifyToken(token1));
+      callRemoveExpiredTokensAndValidateSQL(secretManager2, token1Id, false);
+    } finally {
+      stopTokenManager(tokenManager1);
+      stopTokenManager(tokenManager2);
+    }
+  }
+
+  private Configuration getShortLivedTokenConf() {
+    Configuration shortLivedConf = new Configuration(conf);
+    shortLivedConf.setTimeDuration(
+        
SQLDelegationTokenSecretManager.SQL_DTSM_TOKEN_LOADING_CACHE_EXPIRATION,
+        200, TimeUnit.MILLISECONDS);
+    shortLivedConf.setInt(DelegationTokenManager.RENEW_INTERVAL, 
TOKEN_EXPIRATION_SECONDS);
+    return shortLivedConf;
+  }
+
+  private void callRemoveExpiredTokensAndValidateSQL(
+      TestDelegationTokenSecretManager secretManager, 
AbstractDelegationTokenIdentifier tokenId,
+      boolean expectedInSQL) throws SQLException {
+    secretManager.removeExpiredStoredToken(tokenId);
+    byte[] tokenInfo = 
secretManager.selectTokenInfo(tokenId.getSequenceNumber(),
+        tokenId.getBytes());
+    if (expectedInSQL) {
+      Assert.assertNotNull("Verify token exists in database", tokenInfo);
+    } else {
+      Assert.assertNull("Verify token was removed from database", tokenInfo);
+    }
+  }
+
   @Test
   public void testSequenceNumAllocation() throws Exception {
     int tokensPerManager = 
SQLDelegationTokenSecretManagerImpl.DEFAULT_SEQ_NUM_BATCH_SIZE * 5;
@@ -292,8 +393,13 @@ public class TestSQLDelegationTokenSecretManagerImpl {
   }
 
   private DelegationTokenManager createTokenManager() {
+    return createTokenManager(conf);
+  }
+
+  private DelegationTokenManager createTokenManager(Configuration config) {
     DelegationTokenManager tokenManager = new DelegationTokenManager(new 
Configuration(), null);
-    tokenManager.setExternalDelegationTokenSecretManager(new 
TestDelegationTokenSecretManager());
+    tokenManager.setExternalDelegationTokenSecretManager(
+        new TestDelegationTokenSecretManager(config));
     return tokenManager;
   }
 
@@ -401,7 +507,7 @@ public class TestSQLDelegationTokenSecretManagerImpl {
       return keyRollLock;
     }
 
-    TestDelegationTokenSecretManager() {
+    TestDelegationTokenSecretManager(Configuration conf) {
       super(conf, new TestConnectionFactory(conf),
           SQLSecretManagerRetriableHandlerImpl.getInstance(conf, new 
TestRetryHandler()));
     }
@@ -428,6 +534,16 @@ public class TestSQLDelegationTokenSecretManagerImpl {
       }
     }
 
+    public void waitForTokenEviction(TokenIdentifier tokenId)
+        throws InterruptedException, TimeoutException {
+      // Wait until token is not found on cache
+      GenericTestUtils.waitFor(() -> !this.currentTokens.containsKey(tokenId), 
100, 5000);
+    }
+
+    public void removeExpiredStoredToken(TokenIdentifier tokenId) {
+      super.removeExpiredStoredToken((AbstractDelegationTokenIdentifier) 
tokenId);
+    }
+
     public void setReadOnly(boolean readOnly) {
       ((TestConnectionFactory) getConnectionFactory()).readOnly = readOnly;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to