steveloughran commented on code in PR #6726:
URL: https://github.com/apache/hadoop/pull/6726#discussion_r1572230146


##########
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/bulkdelete.md:
##########
@@ -161,124 +105,20 @@ store.hasPathCapability(path, 
"fs.capability.bulk.delete")
 
 ### Invocation through Reflection.
 
-The need for many Libraries to compile against very old versions of Hadoop
+The need for many libraries to compile against very old versions of Hadoop
 means that most of the cloud-first Filesystem API calls cannot be used except
 through reflection -And the more complicated The API and its data types are,
 The harder that reflection is to implement.
 
-To assist this, the class `org.apache.hadoop.fs.FileUtil` has two methods
+To assist this, the class `org.apache.hadoop.io.wrappedio.WrappedIO` has few 
methods
 which are intended to provide simple access to the API, especially
 through reflection.
 
 ```java
-  /**
-   * Get the maximum number of objects/files to delete in a single request.
-   * @param fs filesystem
-   * @param path path to delete under.
-   * @return a number greater than or equal to zero.
-   * @throws UnsupportedOperationException bulk delete under that path is not 
supported.
-   * @throws IllegalArgumentException path not valid.
-   * @throws IOException problems resolving paths
-   */
+
   public static int bulkDeletePageSize(FileSystem fs, Path path) throws 
IOException;
   
-  /**
-   * Delete a list of files/objects.
-   * <ul>
-   *   <li>Files must be under the path provided in {@code base}.</li>
-   *   <li>The size of the list must be equal to or less than the page 
size.</li>
-   *   <li>Directories are not supported; the outcome of attempting to delete
-   *       directories is undefined (ignored; undetected, listed as 
failures...).</li>
-   *   <li>The operation is not atomic.</li>
-   *   <li>The operation is treated as idempotent: network failures may
-   *        trigger resubmission of the request -any new objects created under 
a
-   *        path in the list may then be deleted.</li>
-   *    <li>There is no guarantee that any parent directories exist after this 
call.
-   *    </li>
-   * </ul>
-   * @param fs filesystem
-   * @param base path to delete under.
-   * @param paths list of paths which must be absolute and under the base path.
-   * @return a list of all the paths which couldn't be deleted for a reason 
other than "not found" and any associated error message.
-   * @throws UnsupportedOperationException bulk delete under that path is not 
supported.
-   * @throws IOException IO problems including networking, authentication and 
more.
-   * @throws IllegalArgumentException if a path argument is invalid.
-   */
-  public static List<Map.Entry<Path, String>> bulkDelete(FileSystem fs, Path 
base, List<Path> paths)
-```
+  public static int bulkDeletePageSize(FileSystem fs, Path path) throws 
IOException;
 
-## S3A Implementation
-
-The S3A client exports this API.

Review Comment:
   this needs to be covered, along with the default implementation "maps to 
delete(path, false)"



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DefaultBulkDeleteOperation.java:
##########
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.util.functional.Tuples;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.fs.BulkDeleteUtils.validateBulkDeletePaths;
+import static org.apache.hadoop.util.Preconditions.checkArgument;
+
+/**
+ * Default implementation of the {@link BulkDelete} interface.
+ */
+public class DefaultBulkDeleteOperation implements BulkDelete {
+
+    private final int pageSize;
+
+    private final Path basePath;
+
+    private final FileSystem fs;
+
+    public DefaultBulkDeleteOperation(int pageSize,
+                                      Path basePath,
+                                      FileSystem fs) {
+        checkArgument(pageSize == 1, "Page size must be equal to 1");
+        this.pageSize = pageSize;
+        this.basePath = requireNonNull(basePath);
+        this.fs = fs;
+    }
+
+    @Override
+    public int pageSize() {
+        return pageSize;
+    }
+
+    @Override
+    public Path basePath() {
+        return basePath;
+    }
+
+    @Override
+    public List<Map.Entry<Path, String>> bulkDelete(Collection<Path> paths)
+            throws IOException, IllegalArgumentException {
+        validateBulkDeletePaths(paths, pageSize, basePath);
+        List<Map.Entry<Path, String>> result = new ArrayList<>();
+        // this for loop doesn't make sense as pageSize must be 1.
+        for (Path path : paths) {

Review Comment:
   there's only even going to be 1 entry here



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DefaultBulkDeleteOperation.java:
##########
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.util.functional.Tuples;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.fs.BulkDeleteUtils.validateBulkDeletePaths;
+import static org.apache.hadoop.util.Preconditions.checkArgument;
+
+/**
+ * Default implementation of the {@link BulkDelete} interface.
+ */
+public class DefaultBulkDeleteOperation implements BulkDelete {
+
+    private final int pageSize;
+
+    private final Path basePath;
+
+    private final FileSystem fs;
+
+    public DefaultBulkDeleteOperation(int pageSize,
+                                      Path basePath,
+                                      FileSystem fs) {
+        checkArgument(pageSize == 1, "Page size must be equal to 1");
+        this.pageSize = pageSize;
+        this.basePath = requireNonNull(basePath);
+        this.fs = fs;
+    }
+
+    @Override
+    public int pageSize() {
+        return pageSize;
+    }
+
+    @Override
+    public Path basePath() {
+        return basePath;
+    }
+
+    @Override
+    public List<Map.Entry<Path, String>> bulkDelete(Collection<Path> paths)
+            throws IOException, IllegalArgumentException {
+        validateBulkDeletePaths(paths, pageSize, basePath);
+        List<Map.Entry<Path, String>> result = new ArrayList<>();
+        // this for loop doesn't make sense as pageSize must be 1.
+        for (Path path : paths) {
+            try {
+                fs.delete(path, false);
+                // What to do if this return false?
+                // I think we should add the path to the result list with 
value "Not Deleted".

Review Comment:
   good q. I'd say yes. or actually do a getFileStatus() cal and see what is 
there for
   * file doesn't exist (not an error)
   * path is a directory: add to result
   
   key is that file not found isn't escalated



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractBulkDeleteTest.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.CommonPathCapabilities;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.wrappedio.WrappedIO;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
+import static org.apache.hadoop.io.wrappedio.WrappedIO.bulkDelete;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Contract tests for bulk delete operation.
+ */
+public abstract class AbstractContractBulkDeleteTest extends 
AbstractFSContractTestBase {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AbstractContractBulkDeleteTest.class);
+
+    protected int pageSize;
+
+    protected Path basePath;
+
+    protected FileSystem fs;
+
+    @Before
+    public void setUp() throws Exception {
+        fs = getFileSystem();
+        basePath = path(getClass().getName());
+        pageSize = WrappedIO.bulkDeletePageSize(getFileSystem(), basePath);
+        fs.mkdirs(basePath);
+    }
+
+    public Path getBasePath() {
+        return basePath;
+    }
+
+    /**
+     * Validate the page size for bulk delete operation. Different stores can 
have different
+     * implementations for bulk delete operation thus different page size.
+     */
+    @Test
+    public void validatePageSize() throws Exception {
+        Assertions.assertThat(pageSize)
+                .describedAs("Page size should be 1 by default for all stores")
+                .isEqualTo(1);
+    }
+
+    @Test
+    public void testPathsSizeEqualsPageSizePrecondition() throws Exception {
+        List<Path> listOfPaths = createListOfPaths(pageSize, basePath);
+        // Bulk delete call should pass with no exception.
+        bulkDelete(getFileSystem(), basePath, listOfPaths);
+    }
+
+    @Test
+    public void testPathsSizeGreaterThanPageSizePrecondition() throws 
Exception {
+        List<Path> listOfPaths = createListOfPaths(pageSize + 1, basePath);
+        intercept(IllegalArgumentException.class,
+                () -> bulkDelete(getFileSystem(), basePath, listOfPaths));
+    }
+
+    @Test
+    public void testPathsSizeLessThanPageSizePrecondition() throws Exception {
+        List<Path> listOfPaths = createListOfPaths(pageSize - 1, basePath);
+        // Bulk delete call should pass with no exception.
+        bulkDelete(getFileSystem(), basePath, listOfPaths);
+    }
+
+    @Test
+    public void testBulkDeleteSuccessful() throws Exception {
+        List<Path> listOfPaths = createListOfPaths(pageSize, basePath);
+        for (Path path : listOfPaths) {
+            touch(fs, path);
+        }
+        FileStatus[] fileStatuses = fs.listStatus(basePath);
+        Assertions.assertThat(fileStatuses)
+                .describedAs("File count after create")
+                .hasSize(pageSize);
+        assertSuccessfulBulkDelete(
+            bulkDelete(getFileSystem(), basePath, listOfPaths));
+        FileStatus[] fileStatusesAfterDelete = fs.listStatus(basePath);
+        Assertions.assertThat(fileStatusesAfterDelete)
+                .describedAs("File statuses should be empty after delete")
+                .isEmpty();
+    }
+
+    @Test
+    public void validatePathCapabilityDeclared() throws Exception {
+        Assertions.assertThat(fs.hasPathCapability(basePath, 
CommonPathCapabilities.BULK_DELETE))
+                .describedAs("Path capability BULK_DELETE should be declared")
+                .isTrue();
+    }
+
+    @Test
+    public void testDeletePathsNotUnderBase() throws Exception {
+        List<Path> paths = new ArrayList<>();
+        Path pathNotUnderBase = path("not-under-base");
+        paths.add(pathNotUnderBase);
+        // Should fail as path is not under the base path.
+        intercept(IllegalArgumentException.class,
+                () -> bulkDelete(getFileSystem(), basePath, paths));
+    }
+
+    @Test
+    public void testDeletePathsNotAbsolute() throws Exception {
+        List<Path> paths = new ArrayList<>();
+        Path pathNotAbsolute = new Path("not-absolute");
+        paths.add(pathNotAbsolute);
+        // Should fail as path is not absolute.
+        intercept(IllegalArgumentException.class,
+                () -> bulkDelete(getFileSystem(), basePath, paths));
+    }
+
+    @Test
+    public void testDeletePathsNotExists() throws Exception {
+        List<Path> paths = new ArrayList<>();
+        Path pathNotExists = new Path(basePath, "not-exists");
+        paths.add(pathNotExists);
+        // bulk delete call doesn't verify if a path exist or not before 
deleting.
+        assertSuccessfulBulkDelete(bulkDelete(getFileSystem(), basePath, 
paths));
+    }
+
+    @Test
+    public void testDeletePathsDirectory() throws Exception {
+        List<Path> paths = new ArrayList<>();
+        Path dirPath = new Path(basePath, "dir");
+        fs.mkdirs(dirPath);
+        paths.add(dirPath);
+        Path filePath = new Path(dirPath, "file");
+        touch(fs, filePath);
+        paths.add(filePath);
+        pageSizePreconditionForTest(paths.size());
+        // Outcome is undefined. But call shouldn't fail. In case of S3 
directories will still be present.
+        assertSuccessfulBulkDelete(bulkDelete(getFileSystem(), basePath, 
paths));
+    }
+
+    @Test
+    public void testDeleteEmptyDirectory() throws Exception {
+        List<Path> paths = new ArrayList<>();
+        Path emptyDirPath = new Path(basePath, "empty-dir");
+        fs.mkdirs(emptyDirPath);
+        paths.add(emptyDirPath);
+        // Should pass as empty directory.
+        assertSuccessfulBulkDelete(bulkDelete(getFileSystem(), basePath, 
paths));
+    }
+
+    @Test
+    public void testDeleteEmptyList() throws Exception {
+        List<Path> paths = new ArrayList<>();
+        // Empty list should pass.
+        assertSuccessfulBulkDelete(bulkDelete(getFileSystem(), basePath, 
paths));
+    }
+
+    @Test
+    public void testDeleteSamePathsMoreThanOnce() throws Exception {
+        List<Path> paths = new ArrayList<>();
+        Path path = new Path(basePath, "file");
+        touch(fs, path);
+        paths.add(path);
+        paths.add(path);
+        Path another = new Path(basePath, "another-file");
+        touch(fs, another);
+        paths.add(another);
+        pageSizePreconditionForTest(paths.size());

Review Comment:
   build the paths and this precondition before the touch; saves any network IO 
before skipping



##########
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/bulkdelete.md:
##########
@@ -0,0 +1,124 @@
+<!---
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+# <a name="BulkDelete"></a> interface `BulkDelete`
+
+<!-- MACRO{toc|fromDepth=1|toDepth=2} -->
+
+The `BulkDelete` interface provides an API to perform bulk delete of 
files/objects
+in an object store or filesystem.
+
+## Key Features
+
+* An API for submitting a list of paths to delete.
+* This list must be no larger than the "page size" supported by the client; 
This size is also exposed as a method.
+* Triggers a request to delete files at the specific paths.
+* Returns a list of which paths were reported as delete failures by the store.
+* Does not consider a nonexistent file to be a failure.
+* Does not offer any atomicity guarantees.
+* Idempotency guarantees are weak: retries may delete files newly created by 
other clients.
+* Provides no guarantees as to the outcome if a path references a directory.
+* Provides no guarantees that parent directories will exist after the call.
+
+
+The API is designed to match the semantics of the AWS S3 [Bulk 
Delete](https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html) 
REST API call, but it is not
+exclusively restricted to this store. This is why the "provides no guarantees"
+restrictions do not state what the outcome will be when executed on other 
stores.
+
+### Interface `org.apache.hadoop.fs.BulkDeleteSource`

Review Comment:
   * cover default implementation to "page size 1, maps to delete(path, false)
   * add a reference to the s3a performance doc (how do we calculate the 
path?), so that people know where to look



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java:
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.core.exception.SdkException;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
+import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
+import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
+import software.amazon.awssdk.services.s3.model.S3Error;
+
+import org.apache.hadoop.fs.s3a.Invoker;
+import org.apache.hadoop.fs.s3a.Retries;
+import org.apache.hadoop.fs.s3a.S3AInstrumentation;
+import org.apache.hadoop.fs.s3a.S3AStorageStatistics;
+import org.apache.hadoop.fs.s3a.S3AStore;
+import org.apache.hadoop.fs.s3a.Statistic;
+import org.apache.hadoop.fs.s3a.api.RequestFactory;
+import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A;
+import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
+import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.store.audit.AuditSpanSource;
+import org.apache.hadoop.util.DurationInfo;
+import org.apache.hadoop.util.RateLimiting;
+import org.apache.hadoop.util.functional.Tuples;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.fs.s3a.S3AUtils.isThrottleException;
+import static org.apache.hadoop.fs.s3a.Statistic.*;
+import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isObjectNotFound;
+import static 
org.apache.hadoop.fs.s3a.impl.InternalConstants.DELETE_CONSIDERED_IDEMPOTENT;
+import static 
org.apache.hadoop.fs.statistics.StoreStatisticNames.STORE_IO_RATE_LIMITED_DURATION;
+import static 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfOperation;
+import static org.apache.hadoop.util.Preconditions.checkArgument;
+
+/**
+ * Store Layer.
+ * This is where lower level storage operations are intended
+ * to move.
+ */
+public class S3AStoreImpl implements S3AStore {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(S3AStoreImpl.class);
+
+  private final StoreContextFactory storeContextFactory;
+
+  private final S3Client s3Client;
+
+  private final String bucket;
+
+  private final RequestFactory requestFactory;
+
+  /** Async client is used for transfer manager. */
+  private S3AsyncClient s3AsyncClient;
+
+  private final DurationTrackerFactory durationTrackerFactory;
+
+  /** The core instrumentation. */
+  private final S3AInstrumentation instrumentation;
+
+  /** Accessors to statistics for this FS. */
+  private final S3AStatisticsContext statisticsContext;
+
+  /** Storage Statistics Bonded to the instrumentation. */
+  private final S3AStorageStatistics storageStatistics;
+
+  private final RateLimiting readRateLimiter;
+
+  private final RateLimiting writeRateLimiter;
+
+  private final StoreContext storeContext;
+
+  private final Invoker invoker;
+
+  private final AuditSpanSource<AuditSpanS3A> auditSpanSource;
+
+  S3AStoreImpl(StoreContextFactory storeContextFactory,
+      S3Client s3Client,
+      DurationTrackerFactory durationTrackerFactory,
+      S3AInstrumentation instrumentation,
+      S3AStatisticsContext statisticsContext,
+      S3AStorageStatistics storageStatistics,
+      RateLimiting readRateLimiter,
+      RateLimiting writeRateLimiter,
+      AuditSpanSource<AuditSpanS3A> auditSpanSource) {
+    this.storeContextFactory = requireNonNull(storeContextFactory);
+    this.s3Client = requireNonNull(s3Client);
+    this.durationTrackerFactory = requireNonNull(durationTrackerFactory);
+    this.instrumentation = requireNonNull(instrumentation);
+    this.statisticsContext = requireNonNull(statisticsContext);
+    this.storageStatistics = requireNonNull(storageStatistics);
+    this.readRateLimiter = requireNonNull(readRateLimiter);
+    this.writeRateLimiter = requireNonNull(writeRateLimiter);
+    this.auditSpanSource = requireNonNull(auditSpanSource);
+    this.storeContext = 
requireNonNull(storeContextFactory.createStoreContext());
+    this.invoker = storeContext.getInvoker();
+    this.bucket = storeContext.getBucket();
+    this.requestFactory = storeContext.getRequestFactory();
+  }
+
+  @Override
+  public Duration acquireWriteCapacity(final int capacity) {
+    return writeRateLimiter.acquire(capacity);
+  }
+
+  @Override
+  public Duration acquireReadCapacity(final int capacity) {
+    return readRateLimiter.acquire(capacity);
+
+  }
+
+  /**
+   * Create the store context.
+   * @return a new store context.
+   */
+  private StoreContext createStoreContext() {
+    return storeContextFactory.createStoreContext();
+  }
+
+  @Override
+  public StoreContext getStoreContext() {
+    return storeContext;
+  }
+
+  private S3Client getS3Client() {
+    return s3Client;
+  }
+
+  @Override
+  public DurationTrackerFactory getDurationTrackerFactory() {
+    return durationTrackerFactory;
+  }
+
+  private S3AInstrumentation getInstrumentation() {
+    return instrumentation;
+  }
+
+  @Override
+  public S3AStatisticsContext getStatisticsContext() {
+    return statisticsContext;
+  }
+
+  private S3AStorageStatistics getStorageStatistics() {
+    return storageStatistics;
+  }
+
+  @Override
+  public RequestFactory getRequestFactory() {
+    return requestFactory;
+  }
+
+  /**
+   * Increment a statistic by 1.
+   * This increments both the instrumentation and storage statistics.
+   * @param statistic The operation to increment
+   */
+  protected void incrementStatistic(Statistic statistic) {
+    incrementStatistic(statistic, 1);
+  }
+
+  protected void incrementDurationStatistic(Statistic statistic, Duration 
duration) {
+    statisticsContext.addValueToQuantiles(statistic, duration.toMillis());
+  }
+
+  /**
+   * Increment a statistic by a specific value.
+   * This increments both the instrumentation and storage statistics.
+   * @param statistic The operation to increment
+   * @param count the count to increment
+   */
+  protected void incrementStatistic(Statistic statistic, long count) {
+    statisticsContext.incrementCounter(statistic, count);
+  }
+
+  /**
+   * Decrement a gauge by a specific value.
+   * @param statistic The operation to decrement
+   * @param count the count to decrement
+   */
+  protected void decrementGauge(Statistic statistic, long count) {
+    statisticsContext.decrementGauge(statistic, count);
+  }
+
+  /**
+   * Increment a gauge by a specific value.
+   * @param statistic The operation to increment
+   * @param count the count to increment
+   */
+  protected void incrementGauge(Statistic statistic, long count) {
+    statisticsContext.incrementGauge(statistic, count);
+  }
+
+  /**
+   * Callback when an operation was retried.
+   * Increments the statistics of ignored errors or throttled requests,
+   * depending up on the exception class.
+   * @param ex exception.
+   */
+  public void operationRetried(Exception ex) {
+    if (isThrottleException(ex)) {
+      LOG.debug("Request throttled");
+      incrementStatistic(STORE_IO_THROTTLED);
+      statisticsContext.addValueToQuantiles(STORE_IO_THROTTLE_RATE, 1);
+    } else {
+      incrementStatistic(STORE_IO_RETRY);
+      incrementStatistic(IGNORED_ERRORS);
+    }
+  }
+
+  /**
+   * Callback from {@link Invoker} when an operation is retried.
+   * @param text text of the operation
+   * @param ex exception
+   * @param retries number of retries
+   * @param idempotent is the method idempotent
+   */
+  public void operationRetried(String text, Exception ex, int retries, boolean 
idempotent) {
+    operationRetried(ex);
+  }
+
+  /**
+   * Get the instrumentation's IOStatistics.
+   * @return statistics
+   */
+  @Override
+  public IOStatistics getIOStatistics() {
+    return instrumentation.getIOStatistics();
+  }
+
+  /**
+   * Start an operation; this informs the audit service of the event
+   * and then sets it as the active span.
+   * @param operation operation name.
+   * @param path1 first path of operation
+   * @param path2 second path of operation
+   * @return a span for the audit
+   * @throws IOException failure
+   */
+  public AuditSpanS3A createSpan(String operation, @Nullable String path1, 
@Nullable String path2)
+      throws IOException {
+
+    return auditSpanSource.createSpan(operation, path1, path2);
+  }
+
+  /**
+   * Reject any request to delete an object where the key is root.
+   * @param key key to validate
+   * @throws IllegalArgumentException if the request was rejected due to
+   * a mistaken attempt to delete the root directory.
+   */
+  private void blockRootDelete(String key) throws IllegalArgumentException {
+    checkArgument(!key.isEmpty() && !"/".equals(key), "Bucket %s cannot be 
deleted", bucket);
+  }
+
+  /**
+   * Perform a bulk object delete operation against S3.
+   * Increments the {@code OBJECT_DELETE_REQUESTS} and write
+   * operation statistics
+   * <p>
+   * {@code OBJECT_DELETE_OBJECTS} is updated with the actual number
+   * of objects deleted in the request.
+   * <p>
+   * Retry policy: retry untranslated; delete considered idempotent.
+   * If the request is throttled, this is logged in the throttle statistics,
+   * with the counter set to the number of keys, rather than the number
+   * of invocations of the delete operation.
+   * This is because S3 considers each key as one mutating operation on
+   * the store when updating its load counters on a specific partition
+   * of an S3 bucket.
+   * If only the request was measured, this operation would under-report.
+   * @param deleteRequest keys to delete on the s3-backend
+   * @return the AWS response
+   * @throws IllegalArgumentException if the request was rejected due to
+   * a mistaken attempt to delete the root directory
+   * @throws SdkException amazon-layer failure.
+   */
+  @Override
+  @Retries.RetryRaw
+  public Map.Entry<Duration, DeleteObjectsResponse> deleteObjects(final 
DeleteObjectsRequest deleteRequest)
+      throws SdkException {
+
+    DeleteObjectsResponse response;
+    BulkDeleteRetryHandler retryHandler = new 
BulkDeleteRetryHandler(createStoreContext());
+
+    final List<ObjectIdentifier> keysToDelete = 
deleteRequest.delete().objects();
+    int keyCount = keysToDelete.size();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Initiating delete operation for {} objects", 
keysToDelete.size());
+      keysToDelete.stream().forEach(objectIdentifier -> {
+        LOG.debug(" \"{}\" {}", objectIdentifier.key(),
+            objectIdentifier.versionId() != null ? 
objectIdentifier.versionId() : "");
+      });
+    }
+    // block root calls
+    
keysToDelete.stream().map(ObjectIdentifier::key).forEach(this::blockRootDelete);
+
+    try (DurationInfo d = new DurationInfo(LOG, false, "DELETE %d keys", 
keyCount)) {
+      response =
+          invoker.retryUntranslated("delete", DELETE_CONSIDERED_IDEMPOTENT, 
(text, e, r, i) -> {
+                // handle the failure
+                retryHandler.bulkDeleteRetried(deleteRequest, e);
+              },
+              // duration is tracked in the bulk delete counters
+              trackDurationOfOperation(getDurationTrackerFactory(),
+                  OBJECT_BULK_DELETE_REQUEST.getSymbol(), () -> {
+                        Duration durationToAcquireWriteCapacity = 
acquireWriteCapacity(keyCount);

Review Comment:
   add comment that this is where write capacity is acquired.



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractBulkDelete.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract.s3a;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BulkDelete;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.AbstractContractBulkDeleteTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.statistics.MeanStatistic;
+import org.apache.hadoop.io.wrappedio.WrappedIO;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.createFiles;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupMeanStatistic;
+import static 
org.apache.hadoop.fs.statistics.StoreStatisticNames.STORE_IO_RATE_LIMITED_DURATION;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MEAN;
+import static org.apache.hadoop.io.wrappedio.WrappedIO.bulkDelete;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Contract tests for bulk delete operation for S3A Implementation.
+ */
+@RunWith(Parameterized.class)
+public class ITestS3AContractBulkDelete extends AbstractContractBulkDeleteTest 
{
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ITestS3AContractBulkDelete.class);
+
+    /**
+     * Delete Page size: {@value}.
+     * This is the default page size for bulk delete operation for this 
contract test.
+     * All the tests in this class should pass number of paths equal to or 
less than
+     * this page size during the bulk delete operation.
+     */
+    private static final int DELETE_PAGE_SIZE = 20;
+
+    private final boolean enableMultiObjectDelete;
+
+    @Parameterized.Parameters(name = "enableMultiObjectDelete = {0}")
+    public static Iterable<Object[]> enableMultiObjectDelete() {
+        return Arrays.asList(new Object[][] {
+                {true},
+                {false}
+        });
+    }
+
+    public ITestS3AContractBulkDelete(boolean enableMultiObjectDelete) {
+        this.enableMultiObjectDelete = enableMultiObjectDelete;
+    }
+
+    @Override
+    protected Configuration createConfiguration() {
+        Configuration conf = super.createConfiguration();
+        S3ATestUtils.disableFilesystemCaching(conf);
+        S3ATestUtils.removeBaseAndBucketOverrides(conf,
+                Constants.BULK_DELETE_PAGE_SIZE);
+        conf.setInt(Constants.BULK_DELETE_PAGE_SIZE, DELETE_PAGE_SIZE);
+        conf.setBoolean(Constants.ENABLE_MULTI_DELETE, 
enableMultiObjectDelete);
+        return conf;
+    }
+
+    @Override
+    protected AbstractFSContract createContract(Configuration conf) {
+        return new S3AContract(createConfiguration());
+    }
+
+    @Override
+    public void validatePageSize() throws Exception {
+        int targetPageSize = DELETE_PAGE_SIZE;

Review Comment:
   how about making a method getExpectedPageSize(); base implementaton returns 
"1", s3a one returns 1 or DELETE_PAGE_SIZE



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractBulkDelete.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract.s3a;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BulkDelete;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.AbstractContractBulkDeleteTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.statistics.MeanStatistic;
+import org.apache.hadoop.io.wrappedio.WrappedIO;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.createFiles;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupMeanStatistic;
+import static 
org.apache.hadoop.fs.statistics.StoreStatisticNames.STORE_IO_RATE_LIMITED_DURATION;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MEAN;
+import static org.apache.hadoop.io.wrappedio.WrappedIO.bulkDelete;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Contract tests for bulk delete operation for S3A Implementation.
+ */
+@RunWith(Parameterized.class)
+public class ITestS3AContractBulkDelete extends AbstractContractBulkDeleteTest 
{
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ITestS3AContractBulkDelete.class);
+
+    /**
+     * Delete Page size: {@value}.
+     * This is the default page size for bulk delete operation for this 
contract test.
+     * All the tests in this class should pass number of paths equal to or 
less than
+     * this page size during the bulk delete operation.
+     */
+    private static final int DELETE_PAGE_SIZE = 20;
+
+    private final boolean enableMultiObjectDelete;
+
+    @Parameterized.Parameters(name = "enableMultiObjectDelete = {0}")
+    public static Iterable<Object[]> enableMultiObjectDelete() {
+        return Arrays.asList(new Object[][] {
+                {true},
+                {false}
+        });
+    }
+
+    public ITestS3AContractBulkDelete(boolean enableMultiObjectDelete) {
+        this.enableMultiObjectDelete = enableMultiObjectDelete;
+    }
+
+    @Override
+    protected Configuration createConfiguration() {
+        Configuration conf = super.createConfiguration();
+        S3ATestUtils.disableFilesystemCaching(conf);
+        S3ATestUtils.removeBaseAndBucketOverrides(conf,
+                Constants.BULK_DELETE_PAGE_SIZE);
+        conf.setInt(Constants.BULK_DELETE_PAGE_SIZE, DELETE_PAGE_SIZE);
+        conf.setBoolean(Constants.ENABLE_MULTI_DELETE, 
enableMultiObjectDelete);
+        return conf;
+    }
+
+    @Override
+    protected AbstractFSContract createContract(Configuration conf) {
+        return new S3AContract(createConfiguration());
+    }
+
+    @Override
+    public void validatePageSize() throws Exception {
+        int targetPageSize = DELETE_PAGE_SIZE;
+        if (!enableMultiObjectDelete) {
+            // if multi-object delete is disabled, page size should be 1.
+            targetPageSize = 1;
+        }
+        Assertions.assertThat(pageSize)
+                .describedAs("Page size should match the configured page size")
+                .isEqualTo(targetPageSize);
+    }
+
+    @Test
+    public void testBulkDeleteZeroPageSizePrecondition() throws Exception {
+        if(!enableMultiObjectDelete) {
+            // if multi-object delete is disabled, skip this test as
+            // page size is always 1.
+            skip("Multi-object delete is disabled");
+        }
+        Configuration conf = getContract().getConf();
+        conf.setInt(Constants.BULK_DELETE_PAGE_SIZE, 0);
+        Path testPath = path(getMethodName());
+        try (S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf)) {
+            intercept(IllegalArgumentException.class,
+                    () -> fs.createBulkDelete(testPath));
+        }
+    }
+
+    @Test
+    public void testPageSizeWhenMultiObjectsDisabled() throws Exception {
+        Configuration conf = getContract().getConf();
+        conf.setBoolean(Constants.ENABLE_MULTI_DELETE, false);
+        Path testPath = path(getMethodName());
+        try (S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf)) {
+            BulkDelete bulkDelete = fs.createBulkDelete(testPath);
+            Assertions.assertThat(bulkDelete.pageSize())
+                    .describedAs("Page size should be 1 when multi-object 
delete is disabled")
+                    .isEqualTo(1);
+        }
+    }
+
+    @Override
+    public void testDeletePathsDirectory() throws Exception {

Review Comment:
   * skip adding this second path without multi-object delete, so we can verify 
L153 assertion always holds
   * add a variant where he path deleted is a parent of the directory



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java:
##########
@@ -702,6 +709,117 @@ public void testPartialDeleteSingleDelete() throws 
Throwable {
     executePartialDelete(createAssumedRoleConfig(), true);
   }
 
+  @Test
+  public void testBulkDeleteOnReadOnlyAccess() throws Throwable {
+    describe("Bulk delete with part of the child tree read only");
+    executeBulkDeleteOnReadOnlyFiles(createAssumedRoleConfig());
+  }
+
+  @Test
+  public void testBulkDeleteWithReadWriteAccess() throws Throwable {
+    describe("Bulk delete with read write access");
+    executeBulkDeleteOnSomeReadOnlyFiles(createAssumedRoleConfig());
+  }
+
+  /**
+   * Execute bulk delete on read only files and some read write files.
+   */
+  private void executeBulkDeleteOnReadOnlyFiles(Configuration 
assumedRoleConfig) throws Exception {
+    Path destDir = methodPath();
+    Path readOnlyDir = new Path(destDir, "readonlyDir");
+
+    // the full FS
+    S3AFileSystem fs = getFileSystem();
+    WrappedIO.bulkDelete(fs, destDir, new ArrayList<>());
+
+    bindReadOnlyRolePolicy(assumedRoleConfig, readOnlyDir);
+    roleFS = (S3AFileSystem) destDir.getFileSystem(assumedRoleConfig);
+
+    int range = 10;

Review Comment:
   on a store where bulk delete page size == 1, use that as the range



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractBulkDeleteTest.java:
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract;
+
+import org.apache.hadoop.fs.*;
+import org.assertj.core.api.Assertions;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+public abstract class AbstractContractBulkDeleteTest extends 
AbstractFSContractTestBase {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AbstractContractBulkDeleteTest.class);
+
+    protected int pageSize;
+
+    protected Path basePath;
+
+    protected FileSystem fs;
+
+    @Before
+    public void setUp() throws Exception {
+        fs = getFileSystem();
+        basePath = path(getClass().getName());

Review Comment:
   problem is that abfs is running test methods in parallel, and unless path() 
invokes methodPath() the methods all work with the same basePath, so can 
cleanup while other methods are active



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DefaultBulkDeleteOperation.java:
##########
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.util.functional.Tuples;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.fs.BulkDeleteUtils.validateBulkDeletePaths;
+import static org.apache.hadoop.util.Preconditions.checkArgument;
+
+/**
+ * Default implementation of the {@link BulkDelete} interface.
+ */
+public class DefaultBulkDeleteOperation implements BulkDelete {
+
+    private final int pageSize;

Review Comment:
   this is always 1, isn't it? so much ban be simplified here
   * no need for the field
   * no need to pass it in the constructor
   * pageSize() to return 1



##########
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md:
##########
@@ -94,6 +94,85 @@ on the client requirements.
 </property>
 ```
 
+## <a name="bulkdelete"></a> Improving delete performance through bulkdelete 
API.
+
+For bulk delete API spec refer to [BulkDelete](bulkdelete.html).

Review Comment:
   generate site and make sure things line up, or just say "filesystem 
specification"



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:
##########
@@ -5768,4 +5739,29 @@ public boolean isMultipartUploadEnabled() {
     return isMultipartUploadEnabled;
   }
 
+  @Override
+  public BulkDelete createBulkDelete(final Path path)
+      throws IllegalArgumentException, IOException {
+
+    final Path p = makeQualified(path);
+    final AuditSpanS3A span = createSpan("bulkdelete", p.toString(), null);
+    final int size = enableMultiObjectsDelete ? pageSize : 1;
+    return new BulkDeleteOperation(
+        createStoreContext(),
+        createBulkDeleteCallbacks(p, size, span),
+        p,
+        size,
+        span);
+  }
+
+  /**
+   * Create the callbacks for the bulk delete operation.
+   * @param span span for operations.

Review Comment:
   nit: add the javadocs to keep yetus happy.



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Optional;
+
+import software.amazon.awssdk.core.exception.SdkException;
+import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
+import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.s3a.api.RequestFactory;
+import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteException;
+import org.apache.hadoop.fs.s3a.impl.StoreContext;
+import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
+import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+
+/**
+ * Interface for the S3A Store;
+ * S3 client interactions should be via this; mocking
+ * is possible for unit tests.
+ */
+@InterfaceAudience.LimitedPrivate("Extensions")
+@InterfaceStability.Unstable
+public interface S3AStore extends IOStatisticsSource {
+
+  /**
+   * Acquire write capacity for operations.
+   * This should be done within retry loops.
+   * @param capacity capacity to acquire.
+   * @return time spent waiting for output.
+   */
+  Duration acquireWriteCapacity(int capacity);
+
+  /**
+   * Acquire read capacity for operations.
+   * This should be done within retry loops.
+   * @param capacity capacity to acquire.
+   * @return time spent waiting for output.
+   */
+  Duration acquireReadCapacity(int capacity);
+
+  StoreContext getStoreContext();
+
+  DurationTrackerFactory getDurationTrackerFactory();
+
+  S3AStatisticsContext getStatisticsContext();
+
+  RequestFactory getRequestFactory();
+
+  /**
+   * Perform a bulk object delete operation against S3.
+   * Increments the {@code OBJECT_DELETE_REQUESTS} and write
+   * operation statistics
+   * <p>
+   * {@code OBJECT_DELETE_OBJECTS} is updated with the actual number
+   * of objects deleted in the request.
+   * <p>
+   * Retry policy: retry untranslated; delete considered idempotent.
+   * If the request is throttled, this is logged in the throttle statistics,
+   * with the counter set to the number of keys, rather than the number
+   * of invocations of the delete operation.
+   * This is because S3 considers each key as one mutating operation on
+   * the store when updating its load counters on a specific partition
+   * of an S3 bucket.
+   * If only the request was measured, this operation would under-report.
+   * @param deleteRequest keys to delete on the s3-backend

Review Comment:
   add sentence explaining that write capacity will be requested proportional 
to the size of the payload, and re-requested on each retry attempt, so that 
retries throttle better



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractBulkDeleteTest.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.CommonPathCapabilities;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.wrappedio.WrappedIO;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
+import static org.apache.hadoop.io.wrappedio.WrappedIO.bulkDelete;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Contract tests for bulk delete operation.
+ */
+public abstract class AbstractContractBulkDeleteTest extends 
AbstractFSContractTestBase {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AbstractContractBulkDeleteTest.class);
+
+    protected int pageSize;
+
+    protected Path basePath;
+
+    protected FileSystem fs;
+
+    @Before
+    public void setUp() throws Exception {
+        fs = getFileSystem();
+        basePath = path(getClass().getName());
+        pageSize = WrappedIO.bulkDeletePageSize(getFileSystem(), basePath);
+        fs.mkdirs(basePath);
+    }
+
+    public Path getBasePath() {
+        return basePath;
+    }
+
+    /**
+     * Validate the page size for bulk delete operation. Different stores can 
have different
+     * implementations for bulk delete operation thus different page size.
+     */
+    @Test
+    public void validatePageSize() throws Exception {
+        Assertions.assertThat(pageSize)
+                .describedAs("Page size should be 1 by default for all stores")

Review Comment:
   isAtLeast(1)



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java:
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.core.exception.SdkException;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
+import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
+import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
+import software.amazon.awssdk.services.s3.model.S3Error;
+
+import org.apache.hadoop.fs.s3a.Invoker;
+import org.apache.hadoop.fs.s3a.Retries;
+import org.apache.hadoop.fs.s3a.S3AInstrumentation;
+import org.apache.hadoop.fs.s3a.S3AStorageStatistics;
+import org.apache.hadoop.fs.s3a.S3AStore;
+import org.apache.hadoop.fs.s3a.Statistic;
+import org.apache.hadoop.fs.s3a.api.RequestFactory;
+import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A;
+import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
+import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.store.audit.AuditSpanSource;
+import org.apache.hadoop.util.DurationInfo;
+import org.apache.hadoop.util.RateLimiting;
+import org.apache.hadoop.util.functional.Tuples;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.fs.s3a.S3AUtils.isThrottleException;
+import static org.apache.hadoop.fs.s3a.Statistic.*;
+import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isObjectNotFound;
+import static 
org.apache.hadoop.fs.s3a.impl.InternalConstants.DELETE_CONSIDERED_IDEMPOTENT;
+import static 
org.apache.hadoop.fs.statistics.StoreStatisticNames.STORE_IO_RATE_LIMITED_DURATION;
+import static 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfOperation;
+import static org.apache.hadoop.util.Preconditions.checkArgument;
+
+/**
+ * Store Layer.
+ * This is where lower level storage operations are intended
+ * to move.
+ */
+public class S3AStoreImpl implements S3AStore {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(S3AStoreImpl.class);
+
+  private final StoreContextFactory storeContextFactory;

Review Comment:
   can you add minimal javadocs for these, even though they are mostly just a 
duplicate of classname/variable



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java:
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.core.exception.SdkException;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
+import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
+import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
+import software.amazon.awssdk.services.s3.model.S3Error;
+
+import org.apache.hadoop.fs.s3a.Invoker;
+import org.apache.hadoop.fs.s3a.Retries;
+import org.apache.hadoop.fs.s3a.S3AInstrumentation;
+import org.apache.hadoop.fs.s3a.S3AStorageStatistics;
+import org.apache.hadoop.fs.s3a.S3AStore;
+import org.apache.hadoop.fs.s3a.Statistic;
+import org.apache.hadoop.fs.s3a.api.RequestFactory;
+import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A;
+import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
+import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.store.audit.AuditSpanSource;
+import org.apache.hadoop.util.DurationInfo;
+import org.apache.hadoop.util.RateLimiting;
+import org.apache.hadoop.util.functional.Tuples;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.fs.s3a.S3AUtils.isThrottleException;
+import static org.apache.hadoop.fs.s3a.Statistic.*;
+import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isObjectNotFound;
+import static 
org.apache.hadoop.fs.s3a.impl.InternalConstants.DELETE_CONSIDERED_IDEMPOTENT;
+import static 
org.apache.hadoop.fs.statistics.StoreStatisticNames.STORE_IO_RATE_LIMITED_DURATION;
+import static 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfOperation;
+import static org.apache.hadoop.util.Preconditions.checkArgument;
+
+/**
+ * Store Layer.
+ * This is where lower level storage operations are intended
+ * to move.
+ */
+public class S3AStoreImpl implements S3AStore {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(S3AStoreImpl.class);
+
+  private final StoreContextFactory storeContextFactory;
+
+  private final S3Client s3Client;
+
+  private final String bucket;
+
+  private final RequestFactory requestFactory;
+
+  /** Async client is used for transfer manager. */
+  private S3AsyncClient s3AsyncClient;
+
+  private final DurationTrackerFactory durationTrackerFactory;
+
+  /** The core instrumentation. */
+  private final S3AInstrumentation instrumentation;
+
+  /** Accessors to statistics for this FS. */
+  private final S3AStatisticsContext statisticsContext;
+
+  /** Storage Statistics Bonded to the instrumentation. */
+  private final S3AStorageStatistics storageStatistics;
+
+  private final RateLimiting readRateLimiter;
+
+  private final RateLimiting writeRateLimiter;
+
+  private final StoreContext storeContext;
+
+  private final Invoker invoker;
+
+  private final AuditSpanSource<AuditSpanS3A> auditSpanSource;
+
+  S3AStoreImpl(StoreContextFactory storeContextFactory,
+      S3Client s3Client,
+      DurationTrackerFactory durationTrackerFactory,
+      S3AInstrumentation instrumentation,
+      S3AStatisticsContext statisticsContext,
+      S3AStorageStatistics storageStatistics,
+      RateLimiting readRateLimiter,
+      RateLimiting writeRateLimiter,
+      AuditSpanSource<AuditSpanS3A> auditSpanSource) {
+    this.storeContextFactory = requireNonNull(storeContextFactory);
+    this.s3Client = requireNonNull(s3Client);
+    this.durationTrackerFactory = requireNonNull(durationTrackerFactory);
+    this.instrumentation = requireNonNull(instrumentation);
+    this.statisticsContext = requireNonNull(statisticsContext);
+    this.storageStatistics = requireNonNull(storageStatistics);
+    this.readRateLimiter = requireNonNull(readRateLimiter);
+    this.writeRateLimiter = requireNonNull(writeRateLimiter);
+    this.auditSpanSource = requireNonNull(auditSpanSource);
+    this.storeContext = 
requireNonNull(storeContextFactory.createStoreContext());
+    this.invoker = storeContext.getInvoker();
+    this.bucket = storeContext.getBucket();
+    this.requestFactory = storeContext.getRequestFactory();
+  }
+
+  @Override
+  public Duration acquireWriteCapacity(final int capacity) {
+    return writeRateLimiter.acquire(capacity);
+  }
+
+  @Override
+  public Duration acquireReadCapacity(final int capacity) {
+    return readRateLimiter.acquire(capacity);
+
+  }
+
+  /**
+   * Create the store context.
+   * @return a new store context.
+   */
+  private StoreContext createStoreContext() {
+    return storeContextFactory.createStoreContext();
+  }
+
+  @Override
+  public StoreContext getStoreContext() {
+    return storeContext;
+  }
+
+  private S3Client getS3Client() {
+    return s3Client;
+  }
+
+  @Override
+  public DurationTrackerFactory getDurationTrackerFactory() {
+    return durationTrackerFactory;
+  }
+
+  private S3AInstrumentation getInstrumentation() {
+    return instrumentation;
+  }
+
+  @Override
+  public S3AStatisticsContext getStatisticsContext() {
+    return statisticsContext;
+  }
+
+  private S3AStorageStatistics getStorageStatistics() {
+    return storageStatistics;
+  }
+
+  @Override
+  public RequestFactory getRequestFactory() {
+    return requestFactory;
+  }
+
+  /**
+   * Increment a statistic by 1.
+   * This increments both the instrumentation and storage statistics.
+   * @param statistic The operation to increment
+   */
+  protected void incrementStatistic(Statistic statistic) {
+    incrementStatistic(statistic, 1);
+  }
+
+  protected void incrementDurationStatistic(Statistic statistic, Duration 
duration) {
+    statisticsContext.addValueToQuantiles(statistic, duration.toMillis());
+  }
+
+  /**
+   * Increment a statistic by a specific value.
+   * This increments both the instrumentation and storage statistics.
+   * @param statistic The operation to increment
+   * @param count the count to increment
+   */
+  protected void incrementStatistic(Statistic statistic, long count) {
+    statisticsContext.incrementCounter(statistic, count);
+  }
+
+  /**
+   * Decrement a gauge by a specific value.
+   * @param statistic The operation to decrement
+   * @param count the count to decrement
+   */
+  protected void decrementGauge(Statistic statistic, long count) {
+    statisticsContext.decrementGauge(statistic, count);
+  }
+
+  /**
+   * Increment a gauge by a specific value.
+   * @param statistic The operation to increment
+   * @param count the count to increment
+   */
+  protected void incrementGauge(Statistic statistic, long count) {
+    statisticsContext.incrementGauge(statistic, count);
+  }
+
+  /**
+   * Callback when an operation was retried.
+   * Increments the statistics of ignored errors or throttled requests,
+   * depending up on the exception class.
+   * @param ex exception.
+   */
+  public void operationRetried(Exception ex) {
+    if (isThrottleException(ex)) {
+      LOG.debug("Request throttled");
+      incrementStatistic(STORE_IO_THROTTLED);
+      statisticsContext.addValueToQuantiles(STORE_IO_THROTTLE_RATE, 1);
+    } else {
+      incrementStatistic(STORE_IO_RETRY);
+      incrementStatistic(IGNORED_ERRORS);
+    }
+  }
+
+  /**
+   * Callback from {@link Invoker} when an operation is retried.
+   * @param text text of the operation
+   * @param ex exception
+   * @param retries number of retries
+   * @param idempotent is the method idempotent
+   */
+  public void operationRetried(String text, Exception ex, int retries, boolean 
idempotent) {
+    operationRetried(ex);
+  }
+
+  /**
+   * Get the instrumentation's IOStatistics.
+   * @return statistics
+   */
+  @Override
+  public IOStatistics getIOStatistics() {
+    return instrumentation.getIOStatistics();
+  }
+
+  /**
+   * Start an operation; this informs the audit service of the event
+   * and then sets it as the active span.
+   * @param operation operation name.
+   * @param path1 first path of operation
+   * @param path2 second path of operation
+   * @return a span for the audit
+   * @throws IOException failure
+   */
+  public AuditSpanS3A createSpan(String operation, @Nullable String path1, 
@Nullable String path2)
+      throws IOException {
+
+    return auditSpanSource.createSpan(operation, path1, path2);
+  }
+
+  /**
+   * Reject any request to delete an object where the key is root.
+   * @param key key to validate
+   * @throws IllegalArgumentException if the request was rejected due to
+   * a mistaken attempt to delete the root directory.
+   */
+  private void blockRootDelete(String key) throws IllegalArgumentException {
+    checkArgument(!key.isEmpty() && !"/".equals(key), "Bucket %s cannot be 
deleted", bucket);
+  }
+
+  /**

Review Comment:
   cut this as the interface is all we need to document.



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractBulkDeleteTest.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.CommonPathCapabilities;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.wrappedio.WrappedIO;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
+import static org.apache.hadoop.io.wrappedio.WrappedIO.bulkDelete;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Contract tests for bulk delete operation.
+ */
+public abstract class AbstractContractBulkDeleteTest extends 
AbstractFSContractTestBase {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AbstractContractBulkDeleteTest.class);
+
+    protected int pageSize;

Review Comment:
   mention that this is determined by asking the filesystem instance



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractBulkDelete.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract.s3a;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BulkDelete;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.AbstractContractBulkDeleteTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.statistics.MeanStatistic;
+import org.apache.hadoop.io.wrappedio.WrappedIO;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.createFiles;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupMeanStatistic;
+import static 
org.apache.hadoop.fs.statistics.StoreStatisticNames.STORE_IO_RATE_LIMITED_DURATION;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MEAN;
+import static org.apache.hadoop.io.wrappedio.WrappedIO.bulkDelete;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Contract tests for bulk delete operation for S3A Implementation.
+ */
+@RunWith(Parameterized.class)
+public class ITestS3AContractBulkDelete extends AbstractContractBulkDeleteTest 
{
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ITestS3AContractBulkDelete.class);
+
+    /**
+     * Delete Page size: {@value}.
+     * This is the default page size for bulk delete operation for this 
contract test.
+     * All the tests in this class should pass number of paths equal to or 
less than
+     * this page size during the bulk delete operation.
+     */
+    private static final int DELETE_PAGE_SIZE = 20;
+
+    private final boolean enableMultiObjectDelete;
+
+    @Parameterized.Parameters(name = "enableMultiObjectDelete = {0}")
+    public static Iterable<Object[]> enableMultiObjectDelete() {
+        return Arrays.asList(new Object[][] {
+                {true},
+                {false}
+        });
+    }
+
+    public ITestS3AContractBulkDelete(boolean enableMultiObjectDelete) {
+        this.enableMultiObjectDelete = enableMultiObjectDelete;
+    }
+
+    @Override
+    protected Configuration createConfiguration() {
+        Configuration conf = super.createConfiguration();
+        S3ATestUtils.disableFilesystemCaching(conf);
+        S3ATestUtils.removeBaseAndBucketOverrides(conf,

Review Comment:
   I think if bulk delete is disabled for a bucket, we should just skip the 
bulk delete set of tests. otherwise test runs against GCS will fail.



##########
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md:
##########
@@ -94,6 +94,85 @@ on the client requirements.
 </property>
 ```
 
+## <a name="bulkdelete"></a> Improving delete performance through bulkdelete 
API.
+
+For bulk delete API spec refer to [BulkDelete](bulkdelete.html).
+
+The S3A client exports this API.
+
+### S3A Implementation of Bulk Delete.
+If multi-object delete is enabled (`fs.s3a.multiobjectdelete.enable` = true), 
as
+it is by default, then the page size is limited to that defined in
+`fs.s3a.bulk.delete.page.size`, which MUST be less than or equal to 1000.
+* The entire list of paths to delete is aggregated into a single bulk delete 
request,
+  issued to the store.
+* Provided the caller has the correct permissions, every entry in the list
+  will, if the path references an object, cause that object to be deleted.
+* If the path does not reference an object: the path will not be deleted
+  "This is for deleting objects, not directories"
+* No probes for the existence of parent directories will take place; no
+  parent directory markers will be created.
+  "If you need parent directories, call mkdir() yourself"
+* The list of failed keys listed in the `DeleteObjectsResponse` response
+  are converted into paths and returned along with their error messages.
+* Network and other IO errors are raised as exceptions.
+
+If multi-object delete is disabled (or the list of size 1)
+* A single `DELETE` call is issued
+* Any `AccessDeniedException` raised is converted to a result in the error 
list.
+* Any 404 response from a (non-AWS) store will be ignored.
+* Network and other IO errors are raised as exceptions.
+
+Because there are no probes to ensure the call does not overwrite a directory,
+or to see if a parentDirectory marker needs to be created,
+this API is still faster than issuing a normal `FileSystem.delete(path)` call.
+
+That is: all the overhead normally undertaken to preserve the Posix System 
model are omitted.
+
+
+### S3 Scalability and Performance
+
+Every entry in a bulk delete request counts as one write operation
+against AWS S3 storage.
+With the default write rate under a prefix on AWS S3 Standard storage
+restricted to 3,500 writes/second, it is very easy to overload
+the store by issuing a few bulk delete requests simultaneously.
+
+* If throttling is triggered then all clients interacting with
+  the store may observe performance issues.
+* The write quota applies even for paths which do not exist.
+* The S3A client *may* perform rate throttling as well as page size limiting.
+
+What does that mean? it means that attempting to issue multiple
+bulk delete calls in parallel can be counterproductive.
+
+When overloaded, the S3 store returns a 403 throttle response.
+This will trigger it back off and retry of posting the request.
+However, the repeated request will still include the same number of objects and
+*so generate the same load*.
+
+This can lead to a pathological situation where the repeated requests will
+never be satisfied because the request itself is sufficient to overload the 
store.
+See [HADOOP-16823.Large DeleteObject requests are their own Thundering 
Herd](https://issues.apache.org/jira/browse/HADOOP-16823)
+for an example of where this did actually surface in production.
+
+This is why the default page size of S3A clients is 250 paths, not the store 
limit of 1000 entries.
+It is also why the S3A delete/rename Operations do not attempt to do massive 
parallel deletions,
+Instead bulk delete requests are queued for a single blocking thread to issue.
+Consider a similar design.
+
+
+When working with versioned S3 buckets, every path deleted will add a 
tombstone marker
+to the store at that location, even if there was no object at that path.
+While this has no negative performance impact on the bulk delete call,
+it will slow down list requests subsequently made against that path.
+That is: bulk delete requests of paths which do not exist will hurt future 
queries.
+
+Avoid this. Note also that TPC-DS Benchmark do not create the right load to 
make the
+performance problems observable -but they can surface in production.
+* Configure buckets to have a limited number of days for tombstones to be 
preserved.
+* Do not delete which you know do not contain objects.

Review Comment:
   "do not delete paths which you know reference nonexistent files or 
directories"



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractBulkDeleteTest.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.CommonPathCapabilities;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.wrappedio.WrappedIO;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
+import static org.apache.hadoop.io.wrappedio.WrappedIO.bulkDelete;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Contract tests for bulk delete operation.
+ */
+public abstract class AbstractContractBulkDeleteTest extends 
AbstractFSContractTestBase {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AbstractContractBulkDeleteTest.class);
+
+    protected int pageSize;
+
+    protected Path basePath;
+
+    protected FileSystem fs;
+
+    @Before
+    public void setUp() throws Exception {
+        fs = getFileSystem();
+        basePath = path(getClass().getName());
+        pageSize = WrappedIO.bulkDeletePageSize(getFileSystem(), basePath);
+        fs.mkdirs(basePath);
+    }
+
+    public Path getBasePath() {
+        return basePath;
+    }
+
+    /**
+     * Validate the page size for bulk delete operation. Different stores can 
have different
+     * implementations for bulk delete operation thus different page size.
+     */
+    @Test
+    public void validatePageSize() throws Exception {
+        Assertions.assertThat(pageSize)
+                .describedAs("Page size should be 1 by default for all stores")
+                .isEqualTo(1);
+    }
+
+    @Test
+    public void testPathsSizeEqualsPageSizePrecondition() throws Exception {
+        List<Path> listOfPaths = createListOfPaths(pageSize, basePath);
+        // Bulk delete call should pass with no exception.
+        bulkDelete(getFileSystem(), basePath, listOfPaths);
+    }
+
+    @Test
+    public void testPathsSizeGreaterThanPageSizePrecondition() throws 
Exception {
+        List<Path> listOfPaths = createListOfPaths(pageSize + 1, basePath);
+        intercept(IllegalArgumentException.class,
+                () -> bulkDelete(getFileSystem(), basePath, listOfPaths));
+    }
+
+    @Test
+    public void testPathsSizeLessThanPageSizePrecondition() throws Exception {
+        List<Path> listOfPaths = createListOfPaths(pageSize - 1, basePath);
+        // Bulk delete call should pass with no exception.
+        bulkDelete(getFileSystem(), basePath, listOfPaths);
+    }
+
+    @Test
+    public void testBulkDeleteSuccessful() throws Exception {
+        List<Path> listOfPaths = createListOfPaths(pageSize, basePath);
+        for (Path path : listOfPaths) {
+            touch(fs, path);
+        }
+        FileStatus[] fileStatuses = fs.listStatus(basePath);
+        Assertions.assertThat(fileStatuses)
+                .describedAs("File count after create")
+                .hasSize(pageSize);
+        assertSuccessfulBulkDelete(
+            bulkDelete(getFileSystem(), basePath, listOfPaths));
+        FileStatus[] fileStatusesAfterDelete = fs.listStatus(basePath);
+        Assertions.assertThat(fileStatusesAfterDelete)
+                .describedAs("File statuses should be empty after delete")
+                .isEmpty();
+    }
+
+    @Test
+    public void validatePathCapabilityDeclared() throws Exception {
+        Assertions.assertThat(fs.hasPathCapability(basePath, 
CommonPathCapabilities.BULK_DELETE))
+                .describedAs("Path capability BULK_DELETE should be declared")
+                .isTrue();
+    }
+
+    @Test
+    public void testDeletePathsNotUnderBase() throws Exception {
+        List<Path> paths = new ArrayList<>();
+        Path pathNotUnderBase = path("not-under-base");
+        paths.add(pathNotUnderBase);
+        // Should fail as path is not under the base path.
+        intercept(IllegalArgumentException.class,
+                () -> bulkDelete(getFileSystem(), basePath, paths));
+    }
+
+    @Test
+    public void testDeletePathsNotAbsolute() throws Exception {
+        List<Path> paths = new ArrayList<>();
+        Path pathNotAbsolute = new Path("not-absolute");
+        paths.add(pathNotAbsolute);
+        // Should fail as path is not absolute.
+        intercept(IllegalArgumentException.class,
+                () -> bulkDelete(getFileSystem(), basePath, paths));
+    }
+
+    @Test
+    public void testDeletePathsNotExists() throws Exception {
+        List<Path> paths = new ArrayList<>();
+        Path pathNotExists = new Path(basePath, "not-exists");
+        paths.add(pathNotExists);
+        // bulk delete call doesn't verify if a path exist or not before 
deleting.
+        assertSuccessfulBulkDelete(bulkDelete(getFileSystem(), basePath, 
paths));
+    }
+
+    @Test
+    public void testDeletePathsDirectory() throws Exception {
+        List<Path> paths = new ArrayList<>();
+        Path dirPath = new Path(basePath, "dir");
+        fs.mkdirs(dirPath);
+        paths.add(dirPath);
+        Path filePath = new Path(dirPath, "file");
+        touch(fs, filePath);
+        paths.add(filePath);
+        pageSizePreconditionForTest(paths.size());
+        // Outcome is undefined. But call shouldn't fail. In case of S3 
directories will still be present.
+        assertSuccessfulBulkDelete(bulkDelete(getFileSystem(), basePath, 
paths));
+    }
+
+    @Test
+    public void testDeleteEmptyDirectory() throws Exception {
+        List<Path> paths = new ArrayList<>();
+        Path emptyDirPath = new Path(basePath, "empty-dir");
+        fs.mkdirs(emptyDirPath);
+        paths.add(emptyDirPath);
+        // Should pass as empty directory.
+        assertSuccessfulBulkDelete(bulkDelete(getFileSystem(), basePath, 
paths));
+    }
+
+    @Test
+    public void testDeleteEmptyList() throws Exception {
+        List<Path> paths = new ArrayList<>();
+        // Empty list should pass.
+        assertSuccessfulBulkDelete(bulkDelete(getFileSystem(), basePath, 
paths));
+    }
+
+    @Test
+    public void testDeleteSamePathsMoreThanOnce() throws Exception {
+        List<Path> paths = new ArrayList<>();
+        Path path = new Path(basePath, "file");
+        touch(fs, path);
+        paths.add(path);
+        paths.add(path);
+        Path another = new Path(basePath, "another-file");
+        touch(fs, another);
+        paths.add(another);
+        pageSizePreconditionForTest(paths.size());
+        assertSuccessfulBulkDelete(bulkDelete(getFileSystem(), basePath, 
paths));
+    }
+
+    /**
+     * Skip test if paths size is greater than page size.
+     */
+    protected void pageSizePreconditionForTest(int size) {
+        if (size > pageSize) {
+            skip("Test requires paths size less than or equal to page size");

Review Comment:
   add page size in message?



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java:
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.core.exception.SdkException;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
+import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
+import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
+import software.amazon.awssdk.services.s3.model.S3Error;
+
+import org.apache.hadoop.fs.s3a.Invoker;
+import org.apache.hadoop.fs.s3a.Retries;
+import org.apache.hadoop.fs.s3a.S3AInstrumentation;
+import org.apache.hadoop.fs.s3a.S3AStorageStatistics;
+import org.apache.hadoop.fs.s3a.S3AStore;
+import org.apache.hadoop.fs.s3a.Statistic;
+import org.apache.hadoop.fs.s3a.api.RequestFactory;
+import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A;
+import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
+import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.store.audit.AuditSpanSource;
+import org.apache.hadoop.util.DurationInfo;
+import org.apache.hadoop.util.RateLimiting;
+import org.apache.hadoop.util.functional.Tuples;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.fs.s3a.S3AUtils.isThrottleException;
+import static org.apache.hadoop.fs.s3a.Statistic.*;
+import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isObjectNotFound;
+import static 
org.apache.hadoop.fs.s3a.impl.InternalConstants.DELETE_CONSIDERED_IDEMPOTENT;
+import static 
org.apache.hadoop.fs.statistics.StoreStatisticNames.STORE_IO_RATE_LIMITED_DURATION;
+import static 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfOperation;
+import static org.apache.hadoop.util.Preconditions.checkArgument;
+
+/**
+ * Store Layer.
+ * This is where lower level storage operations are intended
+ * to move.
+ */
+public class S3AStoreImpl implements S3AStore {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(S3AStoreImpl.class);
+
+  private final StoreContextFactory storeContextFactory;
+
+  private final S3Client s3Client;
+
+  private final String bucket;
+
+  private final RequestFactory requestFactory;
+
+  /** Async client is used for transfer manager. */
+  private S3AsyncClient s3AsyncClient;
+
+  private final DurationTrackerFactory durationTrackerFactory;
+
+  /** The core instrumentation. */
+  private final S3AInstrumentation instrumentation;
+
+  /** Accessors to statistics for this FS. */
+  private final S3AStatisticsContext statisticsContext;
+
+  /** Storage Statistics Bonded to the instrumentation. */
+  private final S3AStorageStatistics storageStatistics;
+
+  private final RateLimiting readRateLimiter;
+
+  private final RateLimiting writeRateLimiter;
+
+  private final StoreContext storeContext;
+
+  private final Invoker invoker;
+
+  private final AuditSpanSource<AuditSpanS3A> auditSpanSource;
+
+  S3AStoreImpl(StoreContextFactory storeContextFactory,
+      S3Client s3Client,
+      DurationTrackerFactory durationTrackerFactory,
+      S3AInstrumentation instrumentation,
+      S3AStatisticsContext statisticsContext,
+      S3AStorageStatistics storageStatistics,
+      RateLimiting readRateLimiter,
+      RateLimiting writeRateLimiter,
+      AuditSpanSource<AuditSpanS3A> auditSpanSource) {
+    this.storeContextFactory = requireNonNull(storeContextFactory);
+    this.s3Client = requireNonNull(s3Client);
+    this.durationTrackerFactory = requireNonNull(durationTrackerFactory);
+    this.instrumentation = requireNonNull(instrumentation);
+    this.statisticsContext = requireNonNull(statisticsContext);
+    this.storageStatistics = requireNonNull(storageStatistics);
+    this.readRateLimiter = requireNonNull(readRateLimiter);
+    this.writeRateLimiter = requireNonNull(writeRateLimiter);
+    this.auditSpanSource = requireNonNull(auditSpanSource);
+    this.storeContext = 
requireNonNull(storeContextFactory.createStoreContext());
+    this.invoker = storeContext.getInvoker();
+    this.bucket = storeContext.getBucket();
+    this.requestFactory = storeContext.getRequestFactory();
+  }
+
+  @Override
+  public Duration acquireWriteCapacity(final int capacity) {
+    return writeRateLimiter.acquire(capacity);
+  }
+
+  @Override
+  public Duration acquireReadCapacity(final int capacity) {
+    return readRateLimiter.acquire(capacity);
+
+  }
+
+  /**
+   * Create the store context.
+   * @return a new store context.
+   */
+  private StoreContext createStoreContext() {
+    return storeContextFactory.createStoreContext();
+  }
+
+  @Override
+  public StoreContext getStoreContext() {
+    return storeContext;
+  }
+
+  private S3Client getS3Client() {
+    return s3Client;
+  }
+
+  @Override
+  public DurationTrackerFactory getDurationTrackerFactory() {
+    return durationTrackerFactory;
+  }
+
+  private S3AInstrumentation getInstrumentation() {
+    return instrumentation;
+  }
+
+  @Override
+  public S3AStatisticsContext getStatisticsContext() {
+    return statisticsContext;
+  }
+
+  private S3AStorageStatistics getStorageStatistics() {
+    return storageStatistics;
+  }
+
+  @Override
+  public RequestFactory getRequestFactory() {
+    return requestFactory;
+  }
+
+  /**
+   * Increment a statistic by 1.
+   * This increments both the instrumentation and storage statistics.
+   * @param statistic The operation to increment
+   */
+  protected void incrementStatistic(Statistic statistic) {
+    incrementStatistic(statistic, 1);
+  }
+
+  protected void incrementDurationStatistic(Statistic statistic, Duration 
duration) {
+    statisticsContext.addValueToQuantiles(statistic, duration.toMillis());
+  }
+
+  /**
+   * Increment a statistic by a specific value.
+   * This increments both the instrumentation and storage statistics.
+   * @param statistic The operation to increment
+   * @param count the count to increment
+   */
+  protected void incrementStatistic(Statistic statistic, long count) {
+    statisticsContext.incrementCounter(statistic, count);
+  }
+
+  /**
+   * Decrement a gauge by a specific value.
+   * @param statistic The operation to decrement
+   * @param count the count to decrement
+   */
+  protected void decrementGauge(Statistic statistic, long count) {
+    statisticsContext.decrementGauge(statistic, count);
+  }
+
+  /**
+   * Increment a gauge by a specific value.
+   * @param statistic The operation to increment
+   * @param count the count to increment
+   */
+  protected void incrementGauge(Statistic statistic, long count) {
+    statisticsContext.incrementGauge(statistic, count);
+  }
+
+  /**
+   * Callback when an operation was retried.
+   * Increments the statistics of ignored errors or throttled requests,
+   * depending up on the exception class.
+   * @param ex exception.
+   */
+  public void operationRetried(Exception ex) {
+    if (isThrottleException(ex)) {
+      LOG.debug("Request throttled");
+      incrementStatistic(STORE_IO_THROTTLED);
+      statisticsContext.addValueToQuantiles(STORE_IO_THROTTLE_RATE, 1);
+    } else {
+      incrementStatistic(STORE_IO_RETRY);
+      incrementStatistic(IGNORED_ERRORS);
+    }
+  }
+
+  /**
+   * Callback from {@link Invoker} when an operation is retried.
+   * @param text text of the operation
+   * @param ex exception
+   * @param retries number of retries
+   * @param idempotent is the method idempotent
+   */
+  public void operationRetried(String text, Exception ex, int retries, boolean 
idempotent) {
+    operationRetried(ex);
+  }
+
+  /**
+   * Get the instrumentation's IOStatistics.
+   * @return statistics
+   */
+  @Override
+  public IOStatistics getIOStatistics() {
+    return instrumentation.getIOStatistics();
+  }
+
+  /**
+   * Start an operation; this informs the audit service of the event
+   * and then sets it as the active span.
+   * @param operation operation name.
+   * @param path1 first path of operation
+   * @param path2 second path of operation
+   * @return a span for the audit
+   * @throws IOException failure
+   */
+  public AuditSpanS3A createSpan(String operation, @Nullable String path1, 
@Nullable String path2)
+      throws IOException {
+
+    return auditSpanSource.createSpan(operation, path1, path2);
+  }
+
+  /**
+   * Reject any request to delete an object where the key is root.
+   * @param key key to validate
+   * @throws IllegalArgumentException if the request was rejected due to
+   * a mistaken attempt to delete the root directory.
+   */
+  private void blockRootDelete(String key) throws IllegalArgumentException {
+    checkArgument(!key.isEmpty() && !"/".equals(key), "Bucket %s cannot be 
deleted", bucket);
+  }
+
+  /**
+   * Perform a bulk object delete operation against S3.
+   * Increments the {@code OBJECT_DELETE_REQUESTS} and write
+   * operation statistics
+   * <p>
+   * {@code OBJECT_DELETE_OBJECTS} is updated with the actual number
+   * of objects deleted in the request.
+   * <p>
+   * Retry policy: retry untranslated; delete considered idempotent.
+   * If the request is throttled, this is logged in the throttle statistics,
+   * with the counter set to the number of keys, rather than the number
+   * of invocations of the delete operation.
+   * This is because S3 considers each key as one mutating operation on
+   * the store when updating its load counters on a specific partition
+   * of an S3 bucket.
+   * If only the request was measured, this operation would under-report.
+   * @param deleteRequest keys to delete on the s3-backend
+   * @return the AWS response
+   * @throws IllegalArgumentException if the request was rejected due to
+   * a mistaken attempt to delete the root directory
+   * @throws SdkException amazon-layer failure.
+   */
+  @Override
+  @Retries.RetryRaw
+  public Map.Entry<Duration, DeleteObjectsResponse> deleteObjects(final 
DeleteObjectsRequest deleteRequest)
+      throws SdkException {
+
+    DeleteObjectsResponse response;
+    BulkDeleteRetryHandler retryHandler = new 
BulkDeleteRetryHandler(createStoreContext());
+
+    final List<ObjectIdentifier> keysToDelete = 
deleteRequest.delete().objects();
+    int keyCount = keysToDelete.size();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Initiating delete operation for {} objects", 
keysToDelete.size());
+      keysToDelete.stream().forEach(objectIdentifier -> {
+        LOG.debug(" \"{}\" {}", objectIdentifier.key(),
+            objectIdentifier.versionId() != null ? 
objectIdentifier.versionId() : "");
+      });
+    }
+    // block root calls
+    
keysToDelete.stream().map(ObjectIdentifier::key).forEach(this::blockRootDelete);
+
+    try (DurationInfo d = new DurationInfo(LOG, false, "DELETE %d keys", 
keyCount)) {
+      response =
+          invoker.retryUntranslated("delete", DELETE_CONSIDERED_IDEMPOTENT, 
(text, e, r, i) -> {
+                // handle the failure
+                retryHandler.bulkDeleteRetried(deleteRequest, e);
+              },
+              // duration is tracked in the bulk delete counters
+              trackDurationOfOperation(getDurationTrackerFactory(),
+                  OBJECT_BULK_DELETE_REQUEST.getSymbol(), () -> {
+                        Duration durationToAcquireWriteCapacity = 
acquireWriteCapacity(keyCount);
+                        instrumentation.recordDuration(STORE_IO_RATE_LIMITED, 
true, durationToAcquireWriteCapacity);
+                        incrementStatistic(OBJECT_DELETE_OBJECTS, keyCount);
+                    return s3Client.deleteObjects(deleteRequest);
+                  }));
+      if (!response.errors().isEmpty()) {
+        // one or more of the keys could not be deleted.
+        // log and then throw
+        List<S3Error> errors = response.errors();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Partial failure of delete, {} errors", errors.size());
+          for (S3Error error : errors) {
+            LOG.debug("{}: \"{}\" - {}", error.key(), error.code(), 
error.message());
+          }
+        }
+      }
+      d.close();
+      return Tuples.pair(d.asDuration(), response);
+
+    } catch (IOException e) {
+      // this is part of the retry signature, nothing else.
+      // convert to unchecked.
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  /**

Review Comment:
   again, cut



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractBulkDelete.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract.s3a;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BulkDelete;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.AbstractContractBulkDeleteTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.statistics.MeanStatistic;
+import org.apache.hadoop.io.wrappedio.WrappedIO;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.createFiles;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupMeanStatistic;
+import static 
org.apache.hadoop.fs.statistics.StoreStatisticNames.STORE_IO_RATE_LIMITED_DURATION;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MEAN;
+import static org.apache.hadoop.io.wrappedio.WrappedIO.bulkDelete;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Contract tests for bulk delete operation for S3A Implementation.
+ */
+@RunWith(Parameterized.class)
+public class ITestS3AContractBulkDelete extends AbstractContractBulkDeleteTest 
{
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ITestS3AContractBulkDelete.class);
+
+    /**
+     * Delete Page size: {@value}.
+     * This is the default page size for bulk delete operation for this 
contract test.
+     * All the tests in this class should pass number of paths equal to or 
less than
+     * this page size during the bulk delete operation.
+     */
+    private static final int DELETE_PAGE_SIZE = 20;
+
+    private final boolean enableMultiObjectDelete;
+
+    @Parameterized.Parameters(name = "enableMultiObjectDelete = {0}")
+    public static Iterable<Object[]> enableMultiObjectDelete() {
+        return Arrays.asList(new Object[][] {
+                {true},
+                {false}
+        });
+    }
+
+    public ITestS3AContractBulkDelete(boolean enableMultiObjectDelete) {
+        this.enableMultiObjectDelete = enableMultiObjectDelete;
+    }
+
+    @Override
+    protected Configuration createConfiguration() {
+        Configuration conf = super.createConfiguration();
+        S3ATestUtils.disableFilesystemCaching(conf);
+        S3ATestUtils.removeBaseAndBucketOverrides(conf,
+                Constants.BULK_DELETE_PAGE_SIZE);
+        conf.setInt(Constants.BULK_DELETE_PAGE_SIZE, DELETE_PAGE_SIZE);
+        conf.setBoolean(Constants.ENABLE_MULTI_DELETE, 
enableMultiObjectDelete);
+        return conf;
+    }
+
+    @Override
+    protected AbstractFSContract createContract(Configuration conf) {
+        return new S3AContract(createConfiguration());
+    }
+
+    @Override
+    public void validatePageSize() throws Exception {
+        int targetPageSize = DELETE_PAGE_SIZE;
+        if (!enableMultiObjectDelete) {
+            // if multi-object delete is disabled, page size should be 1.
+            targetPageSize = 1;
+        }
+        Assertions.assertThat(pageSize)
+                .describedAs("Page size should match the configured page size")
+                .isEqualTo(targetPageSize);
+    }
+
+    @Test
+    public void testBulkDeleteZeroPageSizePrecondition() throws Exception {
+        if(!enableMultiObjectDelete) {
+            // if multi-object delete is disabled, skip this test as
+            // page size is always 1.
+            skip("Multi-object delete is disabled");
+        }
+        Configuration conf = getContract().getConf();
+        conf.setInt(Constants.BULK_DELETE_PAGE_SIZE, 0);
+        Path testPath = path(getMethodName());
+        try (S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf)) {
+            intercept(IllegalArgumentException.class,
+                    () -> fs.createBulkDelete(testPath));
+        }
+    }
+
+    @Test
+    public void testPageSizeWhenMultiObjectsDisabled() throws Exception {
+        Configuration conf = getContract().getConf();
+        conf.setBoolean(Constants.ENABLE_MULTI_DELETE, false);
+        Path testPath = path(getMethodName());
+        try (S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf)) {
+            BulkDelete bulkDelete = fs.createBulkDelete(testPath);
+            Assertions.assertThat(bulkDelete.pageSize())
+                    .describedAs("Page size should be 1 when multi-object 
delete is disabled")
+                    .isEqualTo(1);
+        }
+    }
+
+    @Override
+    public void testDeletePathsDirectory() throws Exception {
+        List<Path> paths = new ArrayList<>();
+        Path dirPath = new Path(basePath, "dir");
+        fs.mkdirs(dirPath);
+        paths.add(dirPath);
+        Path filePath = new Path(dirPath, "file");
+        touch(fs, filePath);
+        paths.add(filePath);
+        pageSizePreconditionForTest(paths.size());
+        assertSuccessfulBulkDelete(bulkDelete(getFileSystem(), basePath, 
paths));
+        // During the bulk delete operation, the directories are not deleted 
in S3A.
+        assertIsDirectory(dirPath);
+    }
+
+    @Test
+    public void testRateLimiting() throws Exception {
+        Configuration conf = getContract().getConf();

Review Comment:
   best to skip on single delete...too hard to generate the load reliably



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractBulkDeleteTest.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.CommonPathCapabilities;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.wrappedio.WrappedIO;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
+import static org.apache.hadoop.io.wrappedio.WrappedIO.bulkDelete;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Contract tests for bulk delete operation.
+ */
+public abstract class AbstractContractBulkDeleteTest extends 
AbstractFSContractTestBase {

Review Comment:
   add a checksum file implementation test too, so yetus runs it.



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java:
##########
@@ -702,6 +709,117 @@ public void testPartialDeleteSingleDelete() throws 
Throwable {
     executePartialDelete(createAssumedRoleConfig(), true);
   }
 
+  @Test
+  public void testBulkDeleteOnReadOnlyAccess() throws Throwable {
+    describe("Bulk delete with part of the child tree read only");
+    executeBulkDeleteOnReadOnlyFiles(createAssumedRoleConfig());
+  }
+
+  @Test
+  public void testBulkDeleteWithReadWriteAccess() throws Throwable {
+    describe("Bulk delete with read write access");
+    executeBulkDeleteOnSomeReadOnlyFiles(createAssumedRoleConfig());
+  }
+
+  /**
+   * Execute bulk delete on read only files and some read write files.
+   */
+  private void executeBulkDeleteOnReadOnlyFiles(Configuration 
assumedRoleConfig) throws Exception {
+    Path destDir = methodPath();
+    Path readOnlyDir = new Path(destDir, "readonlyDir");
+
+    // the full FS
+    S3AFileSystem fs = getFileSystem();
+    WrappedIO.bulkDelete(fs, destDir, new ArrayList<>());
+
+    bindReadOnlyRolePolicy(assumedRoleConfig, readOnlyDir);
+    roleFS = (S3AFileSystem) destDir.getFileSystem(assumedRoleConfig);
+
+    int range = 10;
+    touchFiles(fs, readOnlyDir, range);
+    touchFiles(roleFS, destDir, range);
+    FileStatus[] fileStatuses = roleFS.listStatus(readOnlyDir);
+    List<Path> pathsToDelete = Arrays.stream(fileStatuses)
+            .map(FileStatus::getPath)
+            .collect(Collectors.toList());
+    // bulk delete in the read only FS should fail.
+    BulkDelete bulkDelete = roleFS.createBulkDelete(readOnlyDir);
+    assertAccessDeniedForEachPath(bulkDelete.bulkDelete(pathsToDelete));
+    BulkDelete bulkDelete2 = roleFS.createBulkDelete(destDir);
+    assertAccessDeniedForEachPath(bulkDelete2.bulkDelete(pathsToDelete));
+    // delete the files in the original FS should succeed.
+    BulkDelete bulkDelete3 = fs.createBulkDelete(readOnlyDir);
+    assertSuccessfulBulkDelete(bulkDelete3.bulkDelete(pathsToDelete));
+    FileStatus[] fileStatusesUnderDestDir = roleFS.listStatus(destDir);
+    List<Path> pathsToDeleteUnderDestDir = 
Arrays.stream(fileStatusesUnderDestDir)
+            .map(FileStatus::getPath)
+            .collect(Collectors.toList());
+    BulkDelete bulkDelete4 = fs.createBulkDelete(destDir);
+    
assertSuccessfulBulkDelete(bulkDelete4.bulkDelete(pathsToDeleteUnderDestDir));
+  }
+
+  /**
+   * Execute bulk delete on some read only files and some read write files.
+   */
+  private void executeBulkDeleteOnSomeReadOnlyFiles(Configuration 
assumedRoleConfig)
+          throws IOException {
+    Path destDir = methodPath();

Review Comment:
   skip on page size == 1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to