[ 
https://issues.apache.org/jira/browse/HADOOP-19233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17913265#comment-17913265
 ] 

ASF GitHub Bot commented on HADOOP-19233:
-----------------------------------------

anmolanmol1234 commented on code in PR #7265:
URL: https://github.com/apache/hadoop/pull/7265#discussion_r1916387329


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobRenameHandler.java:
##########
@@ -0,0 +1,652 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIOException;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
+import org.apache.hadoop.fs.azurebfs.enums.BlobCopyProgress;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+
+import static 
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COPY_STATUS_ABORTED;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COPY_STATUS_FAILED;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COPY_STATUS_SUCCESS;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_COPY_ID;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_COPY_SOURCE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_COPY_STATUS;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_COPY_STATUS_DESCRIPTION;
+import static 
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.COPY_BLOB_ABORTED;
+import static 
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.COPY_BLOB_FAILED;
+import static 
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND;
+
+/**
+ * Orchestrator for rename over Blob endpoint. Handles both directory and file
+ * renames. Blob Endpoint does not expose rename API, this class is responsible
+ * for copying the blobs and deleting the source blobs.
+ * <p>
+ * For directory rename, it recursively lists the blobs in the source 
directory and
+ * copies them to the destination directory.
+ */
+public class BlobRenameHandler extends ListActionTaker {
+
+    public static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class);
+
+    private final String srcEtag;
+
+    private final Path src, dst;
+
+    private final boolean isAtomicRename, isAtomicRenameRecovery;
+
+    private final TracingContext tracingContext;
+
+    private AbfsLease srcAbfsLease;
+
+    private String srcLeaseId;
+
+    private final List<AbfsLease> leases = new ArrayList<>();
+
+    private final AtomicInteger operatedBlobCount = new AtomicInteger(0);
+
+    /** Constructor.
+     *
+     * @param src source path
+     * @param dst destination path
+     * @param abfsClient AbfsBlobClient to use for the rename operation
+     * @param srcEtag eTag of the source path
+     * @param isAtomicRename true if the rename operation is atomic
+     * @param isAtomicRenameRecovery true if the rename operation is a 
recovery of a previous failed atomic rename operation
+     * @param tracingContext object of tracingContext used for the tracing of 
the server calls.
+     */
+    public BlobRenameHandler(final String src,
+                             final String dst,
+                             final AbfsBlobClient abfsClient,
+                             final String srcEtag,
+                             final boolean isAtomicRename,
+                             final boolean isAtomicRenameRecovery,
+                             final TracingContext tracingContext) {
+        super(new Path(src), abfsClient, tracingContext);
+        this.srcEtag = srcEtag;
+        this.tracingContext = tracingContext;
+        this.src = new Path(src);
+        this.dst = new Path(dst);
+        this.isAtomicRename = isAtomicRename;
+        this.isAtomicRenameRecovery = isAtomicRenameRecovery;
+    }
+
+    public BlobRenameHandler(final String src,
+                             final String dst,
+                             final AbfsBlobClient abfsClient,
+                             final String srcEtag,
+                             final boolean isAtomicRename,
+                             final boolean isAtomicRenameRecovery,
+                             final AbfsLease srcAbfsLease,
+                             final TracingContext tracingContext) {
+        this(src, dst, abfsClient, srcEtag, isAtomicRename, 
isAtomicRenameRecovery,
+                tracingContext);
+        this.srcAbfsLease = srcAbfsLease;
+    }
+
+    @Override
+    int getMaxConsumptionParallelism() {
+        return getAbfsClient().getAbfsConfiguration()
+                .getBlobRenameDirConsumptionParallelism();
+    }
+
+    /**
+     * Orchestrates the rename operation.
+     *
+     * @return AbfsClientRenameResult containing the result of the rename 
operation
+     * @throws AzureBlobFileSystemException if server call fails
+     */
+    public boolean execute() throws AzureBlobFileSystemException {
+        PathInformation pathInformation = new PathInformation();
+        boolean result = false;
+        if (preCheck(src, dst, pathInformation)) {
+            RenameAtomicity renameAtomicity = null;
+            if (pathInformation.getIsDirectory()
+                    && pathInformation.getIsImplicit()) {
+                AbfsRestOperation createMarkerOp = 
getAbfsClient().createPath(src.toUri().getPath(),
+                        false, false, null,
+                        false, null, null, tracingContext);
+                
pathInformation.setETag(extractEtagHeader(createMarkerOp.getResult()));
+            }
+            try {
+                if (isAtomicRename) {
+                    /*
+                     * Conditionally get a lease on the source blob to prevent 
other writers
+                     * from changing it. This is used for correctness in HBase 
when log files
+                     * are renamed. When the HBase master renames a log file 
folder, the lease
+                     * locks out other writers.  This prevents a region server 
that the master
+                     * thinks is dead, but is still alive, from committing 
additional updates.
+                     * This is different than when HBase runs on HDFS, where 
the region server
+                     * recovers the lease on a log file, to gain exclusive 
access to it, before
+                     * it splits it.
+                     */
+                    if (srcAbfsLease == null) {
+                        srcAbfsLease = takeLease(src, srcEtag);
+                    }
+                    srcLeaseId = srcAbfsLease.getLeaseID();
+                    if (!isAtomicRenameRecovery && 
pathInformation.getIsDirectory()) {
+                        /*
+                         * if it is not a resume of a previous failed atomic 
rename operation,
+                         * perform the pre-rename operation.
+                         */
+                        renameAtomicity = getRenameAtomicity(pathInformation);
+                        renameAtomicity.preRename();
+                    }
+                }
+                if (pathInformation.getIsDirectory()) {
+                    result = listRecursiveAndTakeAction() && finalSrcRename();
+                } else {
+                    result = renameInternal(src, dst);
+                }
+            } finally {
+                if (srcAbfsLease != null) {
+                    // If the operation is successful, cancel the timer and no 
need to release
+                    // the lease as delete on the blob-path has taken place.
+                    if (result) {
+                        srcAbfsLease.cancelTimer();
+                    } else {
+                        srcAbfsLease.free();
+                    }
+                }
+            }
+            if (result && renameAtomicity != null) {
+                renameAtomicity.postRename();
+            }
+        }
+        return result;
+    }
+
+    /** Final rename operation after all the blobs have been copied.
+     *
+     * @return true if rename is successful
+     * @throws AzureBlobFileSystemException if server call fails
+     */
+    private boolean finalSrcRename() throws AzureBlobFileSystemException {
+        tracingContext.setOperatedBlobCount(operatedBlobCount.get() + 1);
+        try {
+            return renameInternal(src, dst);
+        } finally {
+            tracingContext.setOperatedBlobCount(null);
+        }
+    }
+
+    /** Gets the rename atomicity object.
+     *
+     * @param pathInformation object containing the path information of the 
source path
+     *
+     * @return RenameAtomicity object
+     */
+    @VisibleForTesting
+    public RenameAtomicity getRenameAtomicity(final PathInformation 
pathInformation) {
+        return new RenameAtomicity(src,
+                dst,
+                new Path(src.getParent(), src.getName() + 
RenameAtomicity.SUFFIX),
+                tracingContext,
+                pathInformation.getETag(),
+                getAbfsClient());
+    }
+
+    /** Takes a lease on the path.
+     *
+     * @param path path on which the lease is to be taken
+     * @param eTag eTag of the path
+     *
+     * @return object containing the lease information
+     * @throws AzureBlobFileSystemException if server call fails
+     */
+    private AbfsLease takeLease(final Path path, final String eTag)
+            throws AzureBlobFileSystemException {
+        AbfsLease lease = new AbfsLease(getAbfsClient(), 
path.toUri().getPath(), false,
+                getAbfsClient().getAbfsConfiguration()
+                        .getAtomicRenameLeaseRefreshDuration(),
+                eTag, tracingContext);
+        leases.add(lease);
+        return lease;
+    }
+
+    /** Checks if the path contains a colon.
+     *
+     * @param p path to check
+     *
+     * @return true if the path contains a colon
+     */
+    private boolean containsColon(Path p) {
+        return p.toUri().getPath().contains(":");
+    }
+
+    /**
+     * Since, server doesn't have a rename API and would not be able to check 
HDFS
+     * contracts, client would have to ensure that no HDFS contract is 
violated.
+     *
+     * @param src source path
+     * @param dst destination path
+     * @param pathInformation object in which path information of the source 
path would be stored
+     *
+     * @return true if the pre-checks pass
+     * @throws AzureBlobFileSystemException if server call fails or given 
paths are invalid.
+     */
+    private boolean preCheck(final Path src, final Path dst,
+                             final PathInformation pathInformation)
+            throws AzureBlobFileSystemException {
+        validateDestinationPath(src, dst);
+
+        setSrcPathInformation(src, pathInformation);
+        validateSourcePath(pathInformation);
+        validateDestinationPathNotExist(src, dst, pathInformation);
+        validateDestinationParentExist(src, dst, pathInformation);
+
+        return true;
+    }
+
+    /**
+     * Validate if the format of the destination path is correct and if the 
destination
+     * path is not a sub-directory of the source path.
+     *
+     * @param src source path
+     * @param dst destination path
+     *
+     * @throws AbfsRestOperationException if the destination path is invalid
+     */
+    private void validateDestinationPath(final Path src, final Path dst)
+            throws AbfsRestOperationException {
+        if (containsColon(dst)) {
+            throw new AbfsRestOperationException(
+                    HttpURLConnection.HTTP_BAD_REQUEST,
+                    
AzureServiceErrorCode.INVALID_RENAME_DESTINATION.getErrorCode(), null,
+                    new PathIOException(dst.toUri().getPath(),
+                            "Destination path contains colon"));
+        }
+
+        validateDestinationIsNotSubDir(src, dst);
+    }
+
+    /**
+     * Validate if the destination path is not a sub-directory of the source 
path.
+     *
+     * @param src source path
+     * @param dst destination path
+     */
+    private void validateDestinationIsNotSubDir(final Path src,
+                                                final Path dst) throws 
AbfsRestOperationException {
+        LOG.debug("Check if the destination is subDirectory");
+        Path nestedDstParent = dst.getParent();
+        if (nestedDstParent != null && nestedDstParent.toUri()
+                .getPath()
+                .indexOf(src.toUri().getPath()) == 0) {
+            LOG.info("Rename src: {} dst: {} failed as dst is subDir of src",
+                    src, dst);
+            throw new 
AbfsRestOperationException(HttpURLConnection.HTTP_CONFLICT,
+                    
AzureServiceErrorCode.INVALID_RENAME_SOURCE_PATH.getErrorCode(),
+                    
AzureServiceErrorCode.INVALID_RENAME_SOURCE_PATH.getErrorMessage(),
+                    new Exception(
+                            
AzureServiceErrorCode.INVALID_RENAME_SOURCE_PATH.getErrorCode()));
+        }
+    }
+
+    /** Set the path information of the source path.
+     *
+     * @param src source path
+     * @param pathInformation object containing the path information of the 
source path
+     *
+     * @throws AzureBlobFileSystemException if server call fails
+     */
+    private void setSrcPathInformation(final Path src,
+                                       final PathInformation pathInformation)
+            throws AzureBlobFileSystemException {
+        pathInformation.copy(getPathInformation(src, tracingContext));
+    }
+
+    /**
+     * Validate if the source path exists and if the client knows the ETag of 
the source path,
+     * then the ETag should match with the server.
+     *
+     * @param pathInformation object containing the path information of the 
source path
+     *
+     * @throws AbfsRestOperationException if the source path is not found or 
if the ETag of the source
+     *                                    path does not match with the server.
+     */
+    private void validateSourcePath(final PathInformation pathInformation)
+            throws AzureBlobFileSystemException {
+        if (!pathInformation.getPathExists()) {
+            throw new AbfsRestOperationException(
+                    HttpURLConnection.HTTP_NOT_FOUND,
+                    
AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND.getErrorCode(), null,
+                    new Exception(
+                            
AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND.getErrorCode()));
+        }
+        if (srcEtag != null && !srcEtag.equals(pathInformation.getETag())) {
+            throw new AbfsRestOperationException(
+                    HttpURLConnection.HTTP_CONFLICT,
+                    AzureServiceErrorCode.PATH_ALREADY_EXISTS.getErrorCode(), 
null,
+                    new Exception(
+                            
AzureServiceErrorCode.PATH_ALREADY_EXISTS.getErrorCode()));
+        }
+    }
+
+    /** Validate if the destination path does not exist.
+     *
+     * @param src source path
+     * @param dst destination path
+     * @param pathInformation object containing the path information of the 
source path
+     *
+     * @throws AbfsRestOperationException if the destination path already 
exists
+     */
+    private void validateDestinationPathNotExist(final Path src,
+                                                 final Path dst,
+                                                 final PathInformation 
pathInformation)
+            throws AzureBlobFileSystemException {
+        /*
+         * Destination path name can be same to that of source path name only 
in the
+         * case of a directory rename.
+         *
+         * In case the directory is being renamed to some other name, the 
destination
+         * check would happen on the AzureBlobFileSystem#rename method.
+         */
+        if (pathInformation.getIsDirectory() && dst.getName()
+                .equals(src.getName())) {
+            PathInformation dstPathInformation = getPathInformation(
+                    dst,
+                    tracingContext);
+            if (dstPathInformation.getPathExists()) {
+                LOG.info(
+                        "Rename src: {} dst: {} failed as qualifiedDst already 
exists",
+                        src, dst);
+                throw new AbfsRestOperationException(
+                        HttpURLConnection.HTTP_CONFLICT,
+                        
AzureServiceErrorCode.PATH_ALREADY_EXISTS.getErrorCode(), null,
+                        null);
+            }
+        }
+    }
+
+    /** Validate if the parent of the destination path exists.
+     *
+     * @param src source path
+     * @param dst destination path
+     * @param pathInformation object containing the path information of the 
source path
+     *
+     * @throws AbfsRestOperationException if the parent of the destination 
path does not exist
+     */
+    private void validateDestinationParentExist(final Path src,
+                                                final Path dst,
+                                                final PathInformation 
pathInformation)
+            throws AzureBlobFileSystemException {
+        final Path nestedDstParent = dst.getParent();
+        if (!dst.isRoot() && nestedDstParent != null && 
!nestedDstParent.isRoot()
+                && (
+                !pathInformation.getIsDirectory() || !dst.getName()
+                        .equals(src.getName()))) {
+            PathInformation nestedDstInfo = getPathInformation(
+                    nestedDstParent,
+                    tracingContext);
+            if (!nestedDstInfo.getPathExists() || 
!nestedDstInfo.getIsDirectory()) {
+                throw new AbfsRestOperationException(
+                        HttpURLConnection.HTTP_NOT_FOUND,
+                        
RENAME_DESTINATION_PARENT_PATH_NOT_FOUND.getErrorCode(), null,
+                        new Exception(
+                                
RENAME_DESTINATION_PARENT_PATH_NOT_FOUND.getErrorCode()));
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    boolean takeAction(final Path path) throws AzureBlobFileSystemException {
+        return renameInternal(path,
+                createDestinationPathForBlobPartOfRenameSrcDir(dst, path, 
src));
+    }
+
+    /** Renames the source path to the destination path.
+     *
+     * @param path source path
+     * @param destinationPathForBlobPartOfRenameSrcDir destination path
+     *
+     * @return true if rename is successful
+     * @throws AzureBlobFileSystemException if server call fails
+     */
+    private boolean renameInternal(final Path path,
+                                   final Path 
destinationPathForBlobPartOfRenameSrcDir)
+            throws AzureBlobFileSystemException {
+        final String leaseId;
+        AbfsLease abfsLease = null;
+        if (isAtomicRename) {
+            /*
+             * To maintain atomicity of rename of the path, lease is taken on 
the path.
+             */
+            if (path.equals(src)) {
+                abfsLease = srcAbfsLease;
+                leaseId = srcLeaseId;
+            } else {
+                abfsLease = takeLease(path, null);
+                leaseId = abfsLease.getLeaseID();
+            }
+        } else {
+            leaseId = null;
+        }
+        boolean operated = false;
+        try {
+            copyPath(path, destinationPathForBlobPartOfRenameSrcDir, leaseId);
+            getAbfsClient().deleteBlobPath(path, leaseId, tracingContext);
+            operated = true;
+        } finally {
+            if (abfsLease != null) {
+                // If the operation is successful, cancel the timer and no 
need to release
+                // the lease as delete on the blob-path has taken place.
+                if (operated) {
+                    abfsLease.cancelTimer();
+                } else {
+                    abfsLease.free();
+                }
+            }
+        }
+        operatedBlobCount.incrementAndGet();
+        return true;
+    }
+
+    /** Copies the source path to the destination path.
+     *
+     * @param src source path
+     * @param dst destination path
+     * @param leaseId lease id for the source path
+     *
+     * @throws AzureBlobFileSystemException if server call fails
+     */
+    private void copyPath(final Path src, final Path dst, final String leaseId)
+            throws AzureBlobFileSystemException {
+        String copyId;
+        try {
+            AbfsRestOperation copyPathOp = getAbfsClient().copyBlob(src, dst, 
leaseId,
+                    tracingContext);
+            final String progress = copyPathOp.getResult()
+                    .getResponseHeader(X_MS_COPY_STATUS);
+            if (COPY_STATUS_SUCCESS.equalsIgnoreCase(progress)) {
+                return;
+            }
+            copyId = copyPathOp.getResult()
+                    .getResponseHeader(X_MS_COPY_ID);
+        } catch (AbfsRestOperationException ex) {
+            if (ex.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) {
+                AbfsRestOperation dstPathStatus = 
getAbfsClient().getPathStatus(
+                        dst.toUri().getPath(),
+                        tracingContext, null, false);
+                final String srcCopyPath = ROOT_PATH + 
getAbfsClient().getFileSystem()
+                        + src.toUri().getPath();
+                if (dstPathStatus.getResult() != null && (srcCopyPath.equals(

Review Comment:
   should have check for dstPathStatus != null as well





> ABFS: [FnsOverBlob] Implementing Rename and Delete APIs over Blob Endpoint
> --------------------------------------------------------------------------
>
>                 Key: HADOOP-19233
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19233
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/azure
>    Affects Versions: 3.4.0
>            Reporter: Anuj Modi
>            Assignee: Manish Bhatt
>            Priority: Major
>              Labels: pull-request-available
>
> Currently, we only support rename and delete operations on the DFS endpoint. 
> The reason for supporting rename and delete operations on the Blob endpoint 
> is that the Blob endpoint does not account for hierarchy. We need to ensure 
> that the HDFS contracts are maintained when performing rename and delete 
> operations. Renaming or deleting a directory over the Blob endpoint requires 
> the client to handle the orchestration and rename or delete all the blobs 
> within the specified directory.
>  
> The task outlines the considerations for implementing rename and delete 
> operations for the FNS-blob endpoint to ensure compatibility with HDFS 
> contracts.
>  * {*}Blob Endpoint Usage{*}: The task addresses the need for abstraction in 
> the code to maintain HDFS contracts while performing rename and delete 
> operations on the blob endpoint, which does not support hierarchy.
>  * {*}Rename Operations{*}: The {{AzureBlobFileSystem#rename()}} method will 
> use a {{RenameHandler}} instance to handle rename operations, with separate 
> handlers for the DFS and blob endpoints. This method includes prechecks, 
> destination adjustments, and orchestration of directory renaming for blobs.
>  * {*}Atomic Rename{*}: Atomic renaming is essential for blob endpoints, as 
> it requires orchestration to copy or delete each blob within the directory. A 
> configuration will allow developers to specify directories for atomic 
> renaming, with a JSON file to track the status of renames.
>  * {*}Delete Operations{*}: Delete operations are simpler than renames, 
> requiring fewer HDFS contract checks. For blob endpoints, the client must 
> handle orchestration, including managing orphaned directories created by 
> Az-copy.
>  * {*}Orchestration for Rename/Delete{*}: Orchestration for rename and delete 
> operations over blob endpoints involves listing blobs and performing actions 
> on each blob. The process must be optimized to handle large numbers of blobs 
> efficiently.
>  * {*}Need for Optimization{*}: Optimization is crucial because the 
> {{ListBlob}} API can return a maximum of 5000 blobs at once, necessitating 
> multiple calls for large directories. The task proposes a producer-consumer 
> model to handle blobs in parallel, thereby reducing processing time and 
> memory usage.
>  * {*}Producer-Consumer Design{*}: The proposed design includes a producer to 
> list blobs, a queue to store the blobs, and a consumer to process them in 
> parallel. This approach aims to improve efficiency and mitigate memory issues.
> More details will follow
> Perquisites for this Patch:
> 1. HADOOP-19187 ABFS: [FnsOverBlob]Making AbfsClient Abstract for supporting 
> both DFS and Blob Endpoint - ASF JIRA (apache.org)
> 2. HADOOP-19226 ABFS: [FnsOverBlob]Implementing Azure Rest APIs on Blob 
> Endpoint for AbfsBlobClient - ASF JIRA (apache.org)
> 3. HADOOP-19207 ABFS: [FnsOverBlob]Response Handling of Blob Endpoint APIs 
> and Metadata APIs - ASF JIRA (apache.org)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to