This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b422e9a5c59 Fix CN pipe procedures restore dead lock (#16324)
b422e9a5c59 is described below

commit b422e9a5c59391a2aa17234c3cf5a230bbce9830
Author: Peng Junzhi <[email protected]>
AuthorDate: Wed Sep 10 03:19:00 2025 -0500

    Fix CN pipe procedures restore dead lock (#16324)
    
    * fix pipe
    
    * fix
    
    * rename
    
    * fix
    
    * add comment
---
 .../pipe/coordinator/runtime/PipeMetaSyncer.java   |  4 +--
 .../runtime/heartbeat/PipeHeartbeatParser.java     |  2 +-
 .../pipe/coordinator/task/PipeTaskCoordinator.java | 14 ++++----
 .../coordinator/task/PipeTaskCoordinatorLock.java  | 13 +++++---
 .../subscription/SubscriptionCoordinator.java      |  2 +-
 .../confignode/procedure/ProcedureExecutor.java    | 38 +++++++++++++++++++++-
 .../impl/pipe/AbstractOperatePipeProcedureV2.java  | 19 ++++++++---
 .../impl/pipe/plugin/DropPipePluginProcedure.java  |  2 +-
 .../runtime/PipeHandleMetaChangeProcedure.java     | 14 +++++---
 .../impl/pipe/runtime/PipeMetaSyncProcedure.java   | 14 +++++---
 ...bstractOperateSubscriptionAndPipeProcedure.java |  7 +++-
 11 files changed, 96 insertions(+), 33 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
index 1df5ac021ca..aaafcda346d 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
@@ -146,7 +146,7 @@ public class PipeMetaSyncer {
 
   private boolean autoRestartWithLock() {
     final AtomicReference<PipeTaskInfo> pipeTaskInfo =
-        configManager.getPipeManager().getPipeTaskCoordinator().tryLock();
+        configManager.getPipeManager().getPipeTaskCoordinator().tryLock().left;
     if (pipeTaskInfo == null) {
       LOGGER.warn("Failed to acquire pipe lock for auto restart pipe task.");
       return false;
@@ -160,7 +160,7 @@ public class PipeMetaSyncer {
 
   private boolean handleSuccessfulRestartWithLock() {
     final AtomicReference<PipeTaskInfo> pipeTaskInfo =
-        configManager.getPipeManager().getPipeTaskCoordinator().tryLock();
+        configManager.getPipeManager().getPipeTaskCoordinator().tryLock().left;
     if (pipeTaskInfo == null) {
       LOGGER.warn("Failed to acquire pipe lock for handling successful 
restart.");
       return false;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
index ace07f5e2d3..8e13a0460e4 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
@@ -99,7 +99,7 @@ public class PipeHeartbeatParser {
         .submit(
             () -> {
               final AtomicReference<PipeTaskInfo> pipeTaskInfo =
-                  
configManager.getPipeManager().getPipeTaskCoordinator().tryLock();
+                  
configManager.getPipeManager().getPipeTaskCoordinator().tryLock().left;
               if (pipeTaskInfo == null) {
                 LOGGER.warn(
                     "Failed to acquire lock when parseHeartbeat from node 
(id={}).", nodeId);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
index 4aaf3ab46c3..25f8e0c0cc5 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
@@ -37,6 +37,7 @@ import 
org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,10 +69,11 @@ public class PipeTaskCoordinator {
    * @return the pipe task info holder, which can be used to get the pipe task 
info. The holder is
    *     null if the lock is not acquired.
    */
-  public AtomicReference<PipeTaskInfo> tryLock() {
-    if (pipeTaskCoordinatorLock.tryLock()) {
+  public Pair<AtomicReference<PipeTaskInfo>, Long> tryLock() {
+    long lockSeqId = pipeTaskCoordinatorLock.tryLock();
+    if (lockSeqId != -1) {
       pipeTaskInfoHolder = new AtomicReference<>(pipeTaskInfo);
-      return pipeTaskInfoHolder;
+      return new Pair<>(pipeTaskInfoHolder, lockSeqId);
     }
 
     return null;
@@ -83,10 +85,10 @@ public class PipeTaskCoordinator {
    * @return the {@link PipeTaskInfo} holder, which can be used to get the 
{@link PipeTaskInfo}.
    *     Wait until lock is acquired
    */
-  public AtomicReference<PipeTaskInfo> lock() {
-    pipeTaskCoordinatorLock.lock();
+  public Pair<AtomicReference<PipeTaskInfo>, Long> lock() {
+    long lockSeqId = pipeTaskCoordinatorLock.lock();
     pipeTaskInfoHolder = new AtomicReference<>(pipeTaskInfo);
-    return pipeTaskInfoHolder;
+    return new Pair<>(pipeTaskInfoHolder, lockSeqId);
   }
 
   /**
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
index e57add9a001..0caf620e7f2 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
@@ -37,8 +37,9 @@ public class PipeTaskCoordinatorLock {
 
   private final BlockingDeque<Long> deque = new LinkedBlockingDeque<>(1);
   private final AtomicLong idGenerator = new AtomicLong(0);
+  private final AtomicLong lockSeqIdGenerator = new AtomicLong(0);
 
-  public void lock() {
+  public long lock() {
     try {
       final long id = idGenerator.incrementAndGet();
       LOGGER.debug(
@@ -50,15 +51,17 @@ public class PipeTaskCoordinatorLock {
           "PipeTaskCoordinator lock (id: {}) acquired by thread {}",
           id,
           Thread.currentThread().getName());
+      return lockSeqIdGenerator.incrementAndGet();
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       LOGGER.error(
           "Interrupted while waiting for PipeTaskCoordinator lock, current 
thread: {}",
           Thread.currentThread().getName());
+      return -1;
     }
   }
 
-  public boolean tryLock() {
+  public long tryLock() {
     try {
       final long id = idGenerator.incrementAndGet();
       LOGGER.debug(
@@ -70,20 +73,20 @@ public class PipeTaskCoordinatorLock {
             "PipeTaskCoordinator lock (id: {}) acquired by thread {}",
             id,
             Thread.currentThread().getName());
-        return true;
+        return lockSeqIdGenerator.incrementAndGet();
       } else {
         LOGGER.info(
             "PipeTaskCoordinator lock (id: {}) failed to acquire by thread {} 
because of timeout",
             id,
             Thread.currentThread().getName());
-        return false;
+        return -1;
       }
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       LOGGER.error(
           "Interrupted while waiting for PipeTaskCoordinator lock, current 
thread: {}",
           Thread.currentThread().getName());
-      return false;
+      return -1;
     }
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
index b52f958d30a..d90e1bbaa30 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
@@ -79,7 +79,7 @@ public class SubscriptionCoordinator {
   /////////////////////////////// Lock ///////////////////////////////
 
   public AtomicReference<SubscriptionInfo> tryLock() {
-    if (coordinatorLock.tryLock()) {
+    if (coordinatorLock.tryLock() != -1) {
       subscriptionInfoHolder = new AtomicReference<>(subscriptionInfo);
       return subscriptionInfoHolder;
     }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
index 0d8368583b4..f0f28ec2741 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import 
org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2;
 import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
 import 
org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler;
 import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
@@ -37,10 +38,12 @@ import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.Deque;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -49,6 +52,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
 import static org.apache.iotdb.confignode.procedure.Procedure.NO_PROC_ID;
 
@@ -116,9 +120,41 @@ public class ProcedureExecutor<Env> {
     recover();
   }
 
+  /***
+   * Filter out pipe procedures that do not need to re-acquire lock and 
re-execute when there are multiple locked pipe procedures during restore.
+   * @return non pipe procedures and one pipe procedure with max lock seq id 
(if there is.)
+   */
+  private List<Procedure<Env>> filteredProcedureList(final 
List<Procedure<Env>> procedures) {
+    List<Procedure<Env>> nonPipeOrLockedProcedures =
+        procedures.stream()
+            .filter(p -> !(p instanceof AbstractOperatePipeProcedureV2) || 
!p.isLockedWhenLoading())
+            .collect(Collectors.toList());
+
+    List<AbstractOperatePipeProcedureV2> lockedPipeProcedures =
+        procedures.stream()
+            .filter(p -> p instanceof AbstractOperatePipeProcedureV2 && 
p.isLockedWhenLoading())
+            .map(AbstractOperatePipeProcedureV2.class::cast)
+            .collect(Collectors.toList());
+    Optional<Procedure<Env>> maxPipeProcedure =
+        lockedPipeProcedures.stream()
+            
.max(Comparator.comparingLong(AbstractOperatePipeProcedureV2::getLockSeqId))
+            .map(p -> (Procedure<Env>) p);
+
+    if (lockedPipeProcedures.size() > 1) {
+      LOG.warn(
+          "[Procedure restore]Detected multiple locked pipe procedures in 
procedure executor {}, only keep last one {}",
+          lockedPipeProcedures,
+          maxPipeProcedure.get());
+    }
+
+    maxPipeProcedure.ifPresent(nonPipeOrLockedProcedures::add);
+    return nonPipeOrLockedProcedures;
+  }
+
   private void recover() {
     // 1.Build rollback stack
-    List<Procedure<Env>> procedureList = 
getProcedureListFromDifferentVersion();
+    List<Procedure<Env>> procedureList =
+        filteredProcedureList(getProcedureListFromDifferentVersion());
     // Load procedure wal file
     for (Procedure<Env> proc : procedureList) {
       if (proc.isFinished()) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index 25466d33983..0dde52ab5dd 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -94,17 +95,17 @@ public abstract class AbstractOperatePipeProcedureV2
   // This variable should not be serialized into procedure store,
   // putting it here is just for convenience
   protected AtomicReference<PipeTaskInfo> pipeTaskInfo;
+  protected long lockSeqId;
 
   private static final String SKIP_PIPE_PROCEDURE_MESSAGE =
       "Try to start a RUNNING pipe or stop a STOPPED pipe, do nothing.";
 
   protected AtomicReference<PipeTaskInfo> acquireLockInternal(
       ConfigNodeProcedureEnv configNodeProcedureEnv) {
-    return configNodeProcedureEnv
-        .getConfigManager()
-        .getPipeManager()
-        .getPipeTaskCoordinator()
-        .lock();
+    Pair<AtomicReference<PipeTaskInfo>, Long> lockRes =
+        
configNodeProcedureEnv.getConfigManager().getPipeManager().getPipeTaskCoordinator().lock();
+    lockSeqId = lockRes.right;
+    return lockRes.left;
   }
 
   @Override
@@ -188,6 +189,10 @@ public abstract class AbstractOperatePipeProcedureV2
     }
   }
 
+  public long getLockSeqId() {
+    return lockSeqId;
+  }
+
   protected abstract PipeTaskOperation getOperation();
 
   /**
@@ -612,11 +617,15 @@ public abstract class AbstractOperatePipeProcedureV2
   public void serialize(DataOutputStream stream) throws IOException {
     super.serialize(stream);
     ReadWriteIOUtils.write(isRollbackFromOperateOnDataNodesSuccessful, stream);
+    ReadWriteIOUtils.write(lockSeqId, stream);
   }
 
   @Override
   public void deserialize(ByteBuffer byteBuffer) {
     super.deserialize(byteBuffer);
     isRollbackFromOperateOnDataNodesSuccessful = 
ReadWriteIOUtils.readBool(byteBuffer);
+    if (byteBuffer.remaining() >= Long.BYTES) {
+      lockSeqId = ReadWriteIOUtils.readLong(byteBuffer);
+    }
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
index 665a3782a91..28c59933cb0 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
@@ -122,7 +122,7 @@ public class DropPipePluginProcedure extends 
AbstractNodeProcedure<DropPipePlugi
     final SubscriptionCoordinator subscriptionCoordinator =
         
env.getConfigManager().getSubscriptionManager().getSubscriptionCoordinator();
 
-    final AtomicReference<PipeTaskInfo> pipeTaskInfo = 
pipeTaskCoordinator.lock();
+    final AtomicReference<PipeTaskInfo> pipeTaskInfo = 
pipeTaskCoordinator.lock().left;
     pipePluginCoordinator.lock();
     SubscriptionInfo subscriptionInfo = 
subscriptionCoordinator.getSubscriptionInfo();
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
index 401859f0a7e..6b2fec6f381 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
@@ -31,6 +31,7 @@ import 
org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,11 +65,14 @@ public class PipeHandleMetaChangeProcedure extends 
AbstractOperatePipeProcedureV
   @Override
   protected AtomicReference<PipeTaskInfo> acquireLockInternal(
       ConfigNodeProcedureEnv configNodeProcedureEnv) {
-    return configNodeProcedureEnv
-        .getConfigManager()
-        .getPipeManager()
-        .getPipeTaskCoordinator()
-        .tryLock();
+    Pair<AtomicReference<PipeTaskInfo>, Long> lockRes =
+        configNodeProcedureEnv
+            .getConfigManager()
+            .getPipeManager()
+            .getPipeTaskCoordinator()
+            .tryLock();
+    lockSeqId = lockRes.right;
+    return lockRes.left;
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
index 393a8bd5ab8..81ed75b7db2 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,11 +74,14 @@ public class PipeMetaSyncProcedure extends 
AbstractOperatePipeProcedureV2 {
   @Override
   protected AtomicReference<PipeTaskInfo> acquireLockInternal(
       ConfigNodeProcedureEnv configNodeProcedureEnv) {
-    return configNodeProcedureEnv
-        .getConfigManager()
-        .getPipeManager()
-        .getPipeTaskCoordinator()
-        .tryLock();
+    Pair<AtomicReference<PipeTaskInfo>, Long> lockRes =
+        configNodeProcedureEnv
+            .getConfigManager()
+            .getPipeManager()
+            .getPipeTaskCoordinator()
+            .tryLock();
+    lockSeqId = lockRes.right;
+    return lockRes.left;
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java
index de90951262c..16391a9ef58 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java
@@ -50,7 +50,12 @@ public abstract class 
AbstractOperateSubscriptionAndPipeProcedure
     LOGGER.info("ProcedureId {} try to acquire subscription and pipe lock.", 
getProcId());
 
     pipeTaskInfo =
-        
configNodeProcedureEnv.getConfigManager().getPipeManager().getPipeTaskCoordinator().lock();
+        configNodeProcedureEnv
+            .getConfigManager()
+            .getPipeManager()
+            .getPipeTaskCoordinator()
+            .lock()
+            .left;
     if (pipeTaskInfo == null) {
       LOGGER.warn("ProcedureId {} failed to acquire pipe lock.", getProcId());
     } else {

Reply via email to