[ 
https://issues.apache.org/jira/browse/HADOOP-19233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17922670#comment-17922670
 ] 

ASF GitHub Bot commented on HADOOP-19233:
-----------------------------------------

bhattmanish98 commented on code in PR #7265:
URL: https://github.com/apache/hadoop/pull/7265#discussion_r1937021494


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobRenameHandler.java:
##########
@@ -0,0 +1,634 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIOException;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
+import org.apache.hadoop.fs.azurebfs.enums.BlobCopyProgress;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+
+import static 
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COPY_STATUS_ABORTED;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COPY_STATUS_FAILED;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COPY_STATUS_SUCCESS;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COLON;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_COPY_ID;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_COPY_SOURCE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_COPY_STATUS;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_COPY_STATUS_DESCRIPTION;
+import static 
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.COPY_BLOB_ABORTED;
+import static 
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.COPY_BLOB_FAILED;
+import static 
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND;
+
+/**
+ * Orchestrator for rename over Blob endpoint. Handles both directory and file
+ * renames. Blob Endpoint does not expose rename API, this class is responsible
+ * for copying the blobs and deleting the source blobs.
+ * <p>
+ * For directory rename, it recursively lists the blobs in the source 
directory and
+ * copies them to the destination directory.
+ */
+public class BlobRenameHandler extends ListActionTaker {
+
+  public static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class);
+
+  private final String srcEtag;
+
+  private final Path src, dst;
+
+  private final boolean isAtomicRename, isAtomicRenameRecovery;
+
+  private final TracingContext tracingContext;
+
+  private AbfsLease srcAbfsLease;
+
+  private String srcLeaseId;
+
+  private final List<AbfsLease> leases = new ArrayList<>();
+
+  private final AtomicInteger operatedBlobCount = new AtomicInteger(0);
+
+  /** Constructor.
+   *
+   * @param src source path
+   * @param dst destination path
+   * @param abfsClient AbfsBlobClient to use for the rename operation
+   * @param srcEtag eTag of the source path
+   * @param isAtomicRename true if the rename operation is atomic
+   * @param isAtomicRenameRecovery true if the rename operation is a recovery 
of a previous failed atomic rename operation
+   * @param tracingContext object of tracingContext used for the tracing of 
the server calls.
+   */
+  public BlobRenameHandler(final String src,
+      final String dst,
+      final AbfsBlobClient abfsClient,
+      final String srcEtag,
+      final boolean isAtomicRename,
+      final boolean isAtomicRenameRecovery,
+      final TracingContext tracingContext) {
+    super(new Path(src), abfsClient, tracingContext);
+    this.srcEtag = srcEtag;
+    this.tracingContext = tracingContext;
+    this.src = new Path(src);
+    this.dst = new Path(dst);
+    this.isAtomicRename = isAtomicRename;
+    this.isAtomicRenameRecovery = isAtomicRenameRecovery;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  int getMaxConsumptionParallelism() {
+    return getAbfsClient().getAbfsConfiguration()
+        .getBlobRenameDirConsumptionParallelism();
+  }
+
+  /**
+   * Orchestrates the rename operation.
+   *
+   * @return AbfsClientRenameResult containing the result of the rename 
operation
+   * @throws AzureBlobFileSystemException if server call fails
+   */
+  public boolean execute() throws AzureBlobFileSystemException {
+    PathInformation pathInformation = getPathInformation(src, tracingContext);
+    boolean result = false;
+    if (preCheck(src, dst, pathInformation)) {
+      RenameAtomicity renameAtomicity = null;
+      if (pathInformation.getIsDirectory()
+          && pathInformation.getIsImplicit()) {
+        AbfsRestOperation createMarkerOp = getAbfsClient().createPath(
+            src.toUri().getPath(),
+            false, false, null,
+            false, null, null, tracingContext);
+        pathInformation.setETag(extractEtagHeader(createMarkerOp.getResult()));
+      }
+      try {
+        if (isAtomicRename) {
+          /*
+           * Conditionally get a lease on the source blob to prevent other 
writers
+           * from changing it. This is used for correctness in HBase when log 
files
+           * are renamed. When the HBase master renames a log file folder, the 
lease
+           * locks out other writers.  This prevents a region server that the 
master
+           * thinks is dead, but is still alive, from committing additional 
updates.
+           * This is different than when HBase runs on HDFS, where the region 
server
+           * recovers the lease on a log file, to gain exclusive access to it, 
before
+           * it splits it.
+           */
+          if (srcAbfsLease == null) {
+            srcAbfsLease = takeLease(src, srcEtag);
+          }
+          srcLeaseId = srcAbfsLease.getLeaseID();
+          if (!isAtomicRenameRecovery && pathInformation.getIsDirectory()) {
+            /*
+             * if it is not a resume of a previous failed atomic rename 
operation,
+             * create renameJson file.
+             */
+            renameAtomicity = getRenameAtomicity(pathInformation);
+            renameAtomicity.preRename();
+          }
+        }
+        if (pathInformation.getIsDirectory()) {
+          result = listRecursiveAndTakeAction() && finalSrcRename();
+        } else {
+          result = renameInternal(src, dst);
+        }
+      } finally {
+        if (srcAbfsLease != null) {
+          // If the operation is successful, cancel the timer and no need to 
release
+          // the lease as rename on the blob-path has taken place.
+          if (result) {
+            srcAbfsLease.cancelTimer();
+          } else {
+            srcAbfsLease.free();
+          }
+        }
+      }
+      if (result && renameAtomicity != null) {
+        renameAtomicity.postRename();
+      }
+    }
+    return result;
+  }
+
+  /** Final rename operation after all the blobs have been copied.
+   *
+   * @return true if rename is successful
+   * @throws AzureBlobFileSystemException if server call fails
+   */
+  private boolean finalSrcRename() throws AzureBlobFileSystemException {
+    tracingContext.setOperatedBlobCount(operatedBlobCount.get() + 1);
+    try {
+      return renameInternal(src, dst);
+    } finally {
+      tracingContext.setOperatedBlobCount(null);
+    }
+  }
+
+  /** Gets the rename atomicity object.
+   *
+   * @param pathInformation object containing the path information of the 
source path
+   *
+   * @return RenameAtomicity object
+   */
+  @VisibleForTesting
+  public RenameAtomicity getRenameAtomicity(final PathInformation 
pathInformation) {
+    return new RenameAtomicity(src,
+        dst,
+        new Path(src.getParent(), src.getName() + RenameAtomicity.SUFFIX),
+        tracingContext,
+        pathInformation.getETag(),
+        getAbfsClient());
+  }
+
+  /** Takes a lease on the path.
+   *
+   * @param path path on which the lease is to be taken
+   * @param eTag eTag of the path
+   *
+   * @return object containing the lease information
+   * @throws AzureBlobFileSystemException if server call fails
+   */
+  private AbfsLease takeLease(final Path path, final String eTag)
+      throws AzureBlobFileSystemException {
+    AbfsLease lease = new AbfsLease(getAbfsClient(), path.toUri().getPath(),
+        false,
+        getAbfsClient().getAbfsConfiguration()
+            .getAtomicRenameLeaseRefreshDuration(),
+        eTag, tracingContext);
+    leases.add(lease);
+    return lease;
+  }
+
+  /** Checks if the path contains a colon.
+   *
+   * @param p path to check
+   *
+   * @return true if the path contains a colon
+   */
+  private boolean containsColon(Path p) {
+    return p.toUri().getPath().contains(COLON);
+  }
+
+  /**
+   * Since, server doesn't have a rename API and would not be able to check 
HDFS
+   * contracts, client would have to ensure that no HDFS contract is violated.
+   *
+   * @param src source path
+   * @param dst destination path
+   * @param pathInformation object in which path information of the source 
path would be stored
+   *
+   * @return true if the pre-checks pass
+   * @throws AzureBlobFileSystemException if server call fails or given paths 
are invalid.
+   */
+  private boolean preCheck(final Path src, final Path dst,
+      final PathInformation pathInformation)
+      throws AzureBlobFileSystemException {
+    validateDestinationPath(src, dst);
+    validateSourcePath(pathInformation);
+    validateDestinationPathNotExist(src, dst, pathInformation);
+    validateDestinationParentExist(src, dst, pathInformation);
+
+    return true;
+  }
+
+  /**
+   * Validate if the format of the destination path is correct and if the 
destination
+   * path is not a sub-directory of the source path.
+   *
+   * @param src source path
+   * @param dst destination path
+   *
+   * @throws AbfsRestOperationException if the destination path is invalid
+   */
+  private void validateDestinationPath(final Path src, final Path dst)
+      throws AbfsRestOperationException {
+    if (containsColon(dst)) {
+      throw new AbfsRestOperationException(
+          HttpURLConnection.HTTP_BAD_REQUEST,
+          AzureServiceErrorCode.INVALID_RENAME_DESTINATION.getErrorCode(), 
null,
+          new PathIOException(dst.toUri().getPath(),
+              "Destination path contains colon"));
+    }
+
+    validateDestinationIsNotSubDir(src, dst);
+  }
+
+  /**
+   * Validate if the destination path is not a sub-directory of the source 
path.
+   *
+   * @param src source path
+   * @param dst destination path
+   */
+  private void validateDestinationIsNotSubDir(final Path src,
+      final Path dst) throws AbfsRestOperationException {
+    LOG.debug("Check if the destination is subDirectory");
+    Path nestedDstParent = dst.getParent();
+    if (nestedDstParent != null && nestedDstParent.toUri()
+        .getPath()
+        .indexOf(src.toUri().getPath()) == 0) {
+      LOG.info("Rename src: {} dst: {} failed as dst is subDir of src",
+          src, dst);
+      throw new AbfsRestOperationException(HttpURLConnection.HTTP_CONFLICT,
+          AzureServiceErrorCode.INVALID_RENAME_SOURCE_PATH.getErrorCode(),
+          AzureServiceErrorCode.INVALID_RENAME_SOURCE_PATH.getErrorMessage(),
+          new Exception(
+              
AzureServiceErrorCode.INVALID_RENAME_SOURCE_PATH.getErrorCode()));
+    }
+  }
+
+  /**
+   * Validate if the source path exists and if the client knows the ETag of 
the source path,
+   * then the ETag should match with the server.
+   *
+   * @param pathInformation object containing the path information of the 
source path
+   *
+   * @throws AbfsRestOperationException if the source path is not found or if 
the ETag of the source
+   *                                    path does not match with the server.
+   */
+  private void validateSourcePath(final PathInformation pathInformation)
+      throws AzureBlobFileSystemException {
+    if (!pathInformation.getPathExists()) {
+      throw new AbfsRestOperationException(
+          HttpURLConnection.HTTP_NOT_FOUND,
+          AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND.getErrorCode(), null,
+          new Exception(
+              AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND.getErrorCode()));
+    }
+    if (srcEtag != null && !srcEtag.equals(pathInformation.getETag())) {
+      throw new AbfsRestOperationException(
+          HttpURLConnection.HTTP_CONFLICT,
+          AzureServiceErrorCode.PATH_ALREADY_EXISTS.getErrorCode(), null,
+          new Exception(
+              AzureServiceErrorCode.PATH_ALREADY_EXISTS.getErrorCode()));
+    }
+  }
+
+  /** Validate if the destination path does not exist.
+   *
+   * @param src source path
+   * @param dst destination path
+   * @param pathInformation object containing the path information of the 
source path
+   *
+   * @throws AbfsRestOperationException if the destination path already exists
+   */
+  private void validateDestinationPathNotExist(final Path src,
+      final Path dst,
+      final PathInformation pathInformation)
+      throws AzureBlobFileSystemException {
+    /*
+     * Destination path name can be same to that of source path name only in 
the
+     * case of a directory rename.
+     *
+     * In case the directory is being renamed to some other name, the 
destination
+     * check would happen on the AzureBlobFileSystem#rename method.
+     */
+    if (pathInformation.getIsDirectory() && dst.getName()
+        .equals(src.getName())) {
+      PathInformation dstPathInformation = getPathInformation(
+          dst,
+          tracingContext);
+      if (dstPathInformation.getPathExists()) {
+        LOG.info(
+            "Rename src: {} dst: {} failed as qualifiedDst already exists",
+            src, dst);
+        throw new AbfsRestOperationException(
+            HttpURLConnection.HTTP_CONFLICT,
+            AzureServiceErrorCode.PATH_ALREADY_EXISTS.getErrorCode(), null,
+            null);
+      }
+    }
+  }
+
+  /** Validate if the parent of the destination path exists.
+   *
+   * @param src source path
+   * @param dst destination path
+   * @param pathInformation object containing the path information of the 
source path
+   *
+   * @throws AbfsRestOperationException if the parent of the destination path 
does not exist
+   */
+  private void validateDestinationParentExist(final Path src,
+      final Path dst,
+      final PathInformation pathInformation)
+      throws AzureBlobFileSystemException {
+    final Path nestedDstParent = dst.getParent();
+    if (!dst.isRoot() && nestedDstParent != null && !nestedDstParent.isRoot()
+        && (
+        !pathInformation.getIsDirectory() || !dst.getName()
+            .equals(src.getName()))) {
+      PathInformation nestedDstInfo = getPathInformation(
+          nestedDstParent,
+          tracingContext);
+      if (!nestedDstInfo.getPathExists() || !nestedDstInfo.getIsDirectory()) {
+        throw new AbfsRestOperationException(
+            HttpURLConnection.HTTP_NOT_FOUND,
+            RENAME_DESTINATION_PARENT_PATH_NOT_FOUND.getErrorCode(), null,
+            new Exception(
+                RENAME_DESTINATION_PARENT_PATH_NOT_FOUND.getErrorCode()));
+      }
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  boolean takeAction(final Path path) throws AzureBlobFileSystemException {
+    return renameInternal(path, getDstPathForBlob(dst, path, src));
+  }
+
+  /** Renames the source path to the destination path.
+   *
+   * @param path source path
+   * @param destinationPathForBlobPartOfRenameSrcDir destination path
+   *
+   * @return true if rename is successful
+   * @throws AzureBlobFileSystemException if server call fails
+   */
+  private boolean renameInternal(final Path path,
+      final Path destinationPathForBlobPartOfRenameSrcDir)
+      throws AzureBlobFileSystemException {
+    final String leaseId;
+    AbfsLease abfsLease = null;
+    if (isAtomicRename) {
+      /*
+       * To maintain atomicity of rename of the path, lease is taken on the 
path.
+       */
+      if (path.equals(src)) {
+        abfsLease = srcAbfsLease;
+        leaseId = srcLeaseId;
+      } else {
+        abfsLease = takeLease(path, null);
+        leaseId = abfsLease.getLeaseID();
+      }
+    } else {
+      leaseId = null;
+    }
+    boolean operated = false;
+    try {
+      copyPath(path, destinationPathForBlobPartOfRenameSrcDir, leaseId);
+      getAbfsClient().deleteBlobPath(path, leaseId, tracingContext);
+      operated = true;
+    } finally {
+      if (abfsLease != null) {
+        // If the operation is successful, cancel the timer and no need to 
release
+        // the lease as delete on the blob-path has taken place.
+        if (operated) {
+          abfsLease.cancelTimer();
+        } else {
+          abfsLease.free();
+        }
+      }
+    }
+    operatedBlobCount.incrementAndGet();
+    return true;
+  }
+
+  /** Copies the source path to the destination path.
+   *
+   * @param src source path
+   * @param dst destination path
+   * @param leaseId lease id for the source path
+   *
+   * @throws AzureBlobFileSystemException if server call fails
+   */
+  private void copyPath(final Path src, final Path dst, final String leaseId)
+      throws AzureBlobFileSystemException {
+    String copyId;
+    try {
+      AbfsRestOperation copyPathOp = getAbfsClient().copyBlob(src, dst, 
leaseId,
+          tracingContext);
+      final String progress = copyPathOp.getResult()
+          .getResponseHeader(X_MS_COPY_STATUS);
+      if (COPY_STATUS_SUCCESS.equalsIgnoreCase(progress)) {
+        return;
+      }
+      copyId = copyPathOp.getResult()
+          .getResponseHeader(X_MS_COPY_ID);
+    } catch (AbfsRestOperationException ex) {
+      if (ex.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) {
+        AbfsRestOperation dstPathStatus = getAbfsClient().getPathStatus(
+            dst.toUri().getPath(),
+            tracingContext, null, false);
+        final String srcCopyPath = ROOT_PATH + getAbfsClient().getFileSystem()
+            + src.toUri().getPath();
+        if (dstPathStatus != null && dstPathStatus.getResult() != null
+            && (srcCopyPath.equals(getDstSource(dstPathStatus)))) {
+          return;
+        }
+      }
+      throw ex;
+    }
+    final long pollWait = getAbfsClient().getAbfsConfiguration()
+        .getBlobCopyProgressPollWaitMillis();
+    final long maxWait = getAbfsClient().getAbfsConfiguration()
+        .getBlobCopyProgressMaxWaitMillis();
+    long startTime = System.currentTimeMillis();
+    while (handleCopyInProgress(dst, tracingContext, copyId)
+        == BlobCopyProgress.PENDING) {
+      if (System.currentTimeMillis() - startTime > maxWait) {
+        throw new TimeoutException(
+            String.format("Blob copy progress wait time exceeded "

Review Comment:
   As discussed offline, this is not needed.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/UriUtils.java:
##########
@@ -230,6 +240,28 @@ private static String replacedUrl(String baseUrl, String 
oldString, String newSt
     return sb.toString();
   }
 
+  public static boolean isKeyForDirectorySet(String key, Set<String> dirSet) {

Review Comment:
   Added.





> ABFS: [FnsOverBlob] Implementing Rename and Delete APIs over Blob Endpoint
> --------------------------------------------------------------------------
>
>                 Key: HADOOP-19233
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19233
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/azure
>    Affects Versions: 3.4.0
>            Reporter: Anuj Modi
>            Assignee: Manish Bhatt
>            Priority: Major
>              Labels: pull-request-available
>
> Currently, we only support rename and delete operations on the DFS endpoint. 
> The reason for supporting rename and delete operations on the Blob endpoint 
> is that the Blob endpoint does not account for hierarchy. We need to ensure 
> that the HDFS contracts are maintained when performing rename and delete 
> operations. Renaming or deleting a directory over the Blob endpoint requires 
> the client to handle the orchestration and rename or delete all the blobs 
> within the specified directory.
>  
> The task outlines the considerations for implementing rename and delete 
> operations for the FNS-blob endpoint to ensure compatibility with HDFS 
> contracts.
>  * {*}Blob Endpoint Usage{*}: The task addresses the need for abstraction in 
> the code to maintain HDFS contracts while performing rename and delete 
> operations on the blob endpoint, which does not support hierarchy.
>  * {*}Rename Operations{*}: The {{AzureBlobFileSystem#rename()}} method will 
> use a {{RenameHandler}} instance to handle rename operations, with separate 
> handlers for the DFS and blob endpoints. This method includes prechecks, 
> destination adjustments, and orchestration of directory renaming for blobs.
>  * {*}Atomic Rename{*}: Atomic renaming is essential for blob endpoints, as 
> it requires orchestration to copy or delete each blob within the directory. A 
> configuration will allow developers to specify directories for atomic 
> renaming, with a JSON file to track the status of renames.
>  * {*}Delete Operations{*}: Delete operations are simpler than renames, 
> requiring fewer HDFS contract checks. For blob endpoints, the client must 
> handle orchestration, including managing orphaned directories created by 
> Az-copy.
>  * {*}Orchestration for Rename/Delete{*}: Orchestration for rename and delete 
> operations over blob endpoints involves listing blobs and performing actions 
> on each blob. The process must be optimized to handle large numbers of blobs 
> efficiently.
>  * {*}Need for Optimization{*}: Optimization is crucial because the 
> {{ListBlob}} API can return a maximum of 5000 blobs at once, necessitating 
> multiple calls for large directories. The task proposes a producer-consumer 
> model to handle blobs in parallel, thereby reducing processing time and 
> memory usage.
>  * {*}Producer-Consumer Design{*}: The proposed design includes a producer to 
> list blobs, a queue to store the blobs, and a consumer to process them in 
> parallel. This approach aims to improve efficiency and mitigate memory issues.
> More details will follow
> Perquisites for this Patch:
> 1. HADOOP-19187 ABFS: [FnsOverBlob]Making AbfsClient Abstract for supporting 
> both DFS and Blob Endpoint - ASF JIRA (apache.org)
> 2. HADOOP-19226 ABFS: [FnsOverBlob]Implementing Azure Rest APIs on Blob 
> Endpoint for AbfsBlobClient - ASF JIRA (apache.org)
> 3. HADOOP-19207 ABFS: [FnsOverBlob]Response Handling of Blob Endpoint APIs 
> and Metadata APIs - ASF JIRA (apache.org)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to