steveloughran commented on code in PR #6726: URL: https://github.com/apache/hadoop/pull/6726#discussion_r1584787287
########## hadoop-common-project/hadoop-common/src/site/markdown/filesystem/bulkdelete.md: ########## @@ -0,0 +1,136 @@ +<!--- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> + +# <a name="BulkDelete"></a> interface `BulkDelete` + +<!-- MACRO{toc|fromDepth=1|toDepth=2} --> + +The `BulkDelete` interface provides an API to perform bulk delete of files/objects +in an object store or filesystem. + +## Key Features + +* An API for submitting a list of paths to delete. +* This list must be no larger than the "page size" supported by the client; This size is also exposed as a method. +* Triggers a request to delete files at the specific paths. +* Returns a list of which paths were reported as delete failures by the store. +* Does not consider a nonexistent file to be a failure. +* Does not offer any atomicity guarantees. +* Idempotency guarantees are weak: retries may delete files newly created by other clients. +* Provides no guarantees as to the outcome if a path references a directory. +* Provides no guarantees that parent directories will exist after the call. + + +The API is designed to match the semantics of the AWS S3 [Bulk Delete](https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html) REST API call, but it is not +exclusively restricted to this store. This is why the "provides no guarantees" +restrictions do not state what the outcome will be when executed on other stores. + +### Interface `org.apache.hadoop.fs.BulkDeleteSource` + +The interface `BulkDeleteSource` is offered by a FileSystem/FileContext class if +it supports the API. + +```java +@InterfaceAudience.Public +@InterfaceStability.Unstable +public interface BulkDeleteSource { + default BulkDelete createBulkDelete(Path path) + throws UnsupportedOperationException, IllegalArgumentException, IOException; + +} + +``` + +### Interface `org.apache.hadoop.fs.BulkDelete` + +This is the bulk delete implementation returned by the `createBulkDelete()` call. + +```java +@InterfaceAudience.Public +@InterfaceStability.Unstable +public interface BulkDelete extends IOStatisticsSource, Closeable { + int pageSize(); + Path basePath(); + List<Map.Entry<Path, String>> bulkDelete(List<Path> paths) + throws IOException, IllegalArgumentException; + +} + +``` + +### `bulkDelete(paths)` + +#### Preconditions + +```python +if length(paths) > pageSize: throw IllegalArgumentException +``` + +#### Postconditions + +All paths which refer to files are removed from the set of files. +```python +FS'Files = FS.Files - [paths] +``` + +No other restrictions are placed upon the outcome. + + +### Availability + +The `BulkDeleteSource` interface is exported by `FileSystem` and `FileContext` storage clients +which is available for all FS via `org.apache.hadoop.fs.DefalutBulkDeleteSource`. For the +ICEBERG integration to work seamlessly, all FS which supports delete() MUST leave the Review Comment: say "for integration in applications like Apache Iceberg", all implementations of this interface MUST NOT reject the request but instead return a BulkDelete instance of size >= 1" ########## hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DefaultBulkDeleteOperation.java: ########## @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.util.functional.Tuples; + +import static java.util.Objects.requireNonNull; +import static org.apache.hadoop.fs.BulkDeleteUtils.validateBulkDeletePaths; + +/** + * Default implementation of the {@link BulkDelete} interface. + */ +public class DefaultBulkDeleteOperation implements BulkDelete { + + private static Logger LOG = LoggerFactory.getLogger(DefaultBulkDeleteOperation.class); + + /** Default page size for bulk delete. */ + private static final int DEFAULT_PAGE_SIZE = 1; + + /** Base path for the bulk delete operation. */ + private final Path basePath; + + /** Delegate File system make actual delete calls. */ + private final FileSystem fs; + + public DefaultBulkDeleteOperation(Path basePath, + FileSystem fs) { + this.basePath = requireNonNull(basePath); + this.fs = fs; + } + + @Override + public int pageSize() { + return DEFAULT_PAGE_SIZE; + } + + @Override + public Path basePath() { + return basePath; + } + + /** + * {@inheritDoc} + */ + @Override + public List<Map.Entry<Path, String>> bulkDelete(Collection<Path> paths) + throws IOException, IllegalArgumentException { + validateBulkDeletePaths(paths, DEFAULT_PAGE_SIZE, basePath); + List<Map.Entry<Path, String>> result = new ArrayList<>(); + if (!paths.isEmpty()) { + // As the page size is always 1, this should be the only one + // path in the collection. + Path pathToDelete = paths.iterator().next(); + try { + boolean deleted = fs.delete(pathToDelete, false); + if (deleted) { Review Comment: We can cut this. from the spec ``` Except in the special case of the root directory, if this API call completed successfully then there is nothing at the end of the path. That is: the outcome is desired. The return flag simply tells the caller whether or not any change was made to the state of the filesystem. ``` instead we can just keep lines 97-99. That should suffice ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java: ########## @@ -283,7 +287,8 @@ @InterfaceStability.Evolving public class S3AFileSystem extends FileSystem implements StreamCapabilities, AWSPolicyProvider, DelegationTokenProvider, IOStatisticsSource, - AuditSpanSource<AuditSpanS3A>, ActiveThreadSpanSource<AuditSpanS3A> { + AuditSpanSource<AuditSpanS3A>, ActiveThreadSpanSource<AuditSpanS3A>, + BulkDeleteSource, StoreContextFactory { Review Comment: we can pull that extends now the superclass declares it ########## hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DefaultBulkDeleteOperation.java: ########## @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.util.functional.Tuples; + +import static java.util.Objects.requireNonNull; +import static org.apache.hadoop.fs.BulkDeleteUtils.validateBulkDeletePaths; + +/** + * Default implementation of the {@link BulkDelete} interface. + */ +public class DefaultBulkDeleteOperation implements BulkDelete { + + private static Logger LOG = LoggerFactory.getLogger(DefaultBulkDeleteOperation.class); + + /** Default page size for bulk delete. */ + private static final int DEFAULT_PAGE_SIZE = 1; + + /** Base path for the bulk delete operation. */ + private final Path basePath; + + /** Delegate File system make actual delete calls. */ + private final FileSystem fs; + + public DefaultBulkDeleteOperation(Path basePath, + FileSystem fs) { + this.basePath = requireNonNull(basePath); + this.fs = fs; + } + + @Override + public int pageSize() { + return DEFAULT_PAGE_SIZE; + } + + @Override + public Path basePath() { + return basePath; + } + + /** + * {@inheritDoc} + */ + @Override + public List<Map.Entry<Path, String>> bulkDelete(Collection<Path> paths) + throws IOException, IllegalArgumentException { + validateBulkDeletePaths(paths, DEFAULT_PAGE_SIZE, basePath); + List<Map.Entry<Path, String>> result = new ArrayList<>(); + if (!paths.isEmpty()) { + // As the page size is always 1, this should be the only one + // path in the collection. + Path pathToDelete = paths.iterator().next(); + try { + boolean deleted = fs.delete(pathToDelete, false); + if (deleted) { + return result; + } else { + try { + FileStatus fileStatus = fs.getFileStatus(pathToDelete); + if (fileStatus.isDirectory()) { + result.add(Tuples.pair(pathToDelete, "Path is a directory")); + } + } catch (FileNotFoundException e) { + // Ignore FNFE and don't add to the result list. + LOG.debug("Couldn't delete {} - does not exist: {}", pathToDelete, e.toString()); + } catch (IOException e) { + LOG.debug("Couldn't delete {} - exception occurred: {}", pathToDelete, e.toString()); Review Comment: have debug log full stack trace ########## hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DefaultBulkDeleteOperation.java: ########## @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; Review Comment: put into fs.impl ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java: ########## @@ -0,0 +1,400 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import javax.annotation.Nullable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectResponse; +import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse; +import software.amazon.awssdk.services.s3.model.ObjectIdentifier; +import software.amazon.awssdk.services.s3.model.S3Error; + +import org.apache.hadoop.fs.s3a.Invoker; +import org.apache.hadoop.fs.s3a.Retries; +import org.apache.hadoop.fs.s3a.S3AInstrumentation; +import org.apache.hadoop.fs.s3a.S3AStorageStatistics; +import org.apache.hadoop.fs.s3a.S3AStore; +import org.apache.hadoop.fs.s3a.Statistic; +import org.apache.hadoop.fs.s3a.api.RequestFactory; +import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A; +import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext; +import org.apache.hadoop.fs.statistics.DurationTrackerFactory; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.store.audit.AuditSpanSource; +import org.apache.hadoop.util.DurationInfo; +import org.apache.hadoop.util.RateLimiting; +import org.apache.hadoop.util.functional.Tuples; + +import static java.util.Objects.requireNonNull; +import static org.apache.hadoop.fs.s3a.S3AUtils.isThrottleException; +import static org.apache.hadoop.fs.s3a.Statistic.*; +import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isObjectNotFound; +import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DELETE_CONSIDERED_IDEMPOTENT; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.STORE_IO_RATE_LIMITED_DURATION; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfOperation; +import static org.apache.hadoop.util.Preconditions.checkArgument; + +/** + * Store Layer. + * This is where lower level storage operations are intended + * to move. + */ +public class S3AStoreImpl implements S3AStore { + + private static final Logger LOG = LoggerFactory.getLogger(S3AStoreImpl.class); + + /** Factory to create store contexts. */ + private final StoreContextFactory storeContextFactory; + + /** The S3 client used to communicate with S3 bucket. */ + private final S3Client s3Client; + + /** The S3 bucket to communicate with. */ + private final String bucket; + + /** Request factory for creating requests. */ + private final RequestFactory requestFactory; + + /** Async client is used for transfer manager. */ + private S3AsyncClient s3AsyncClient; + + /** Duration tracker factory. */ + private final DurationTrackerFactory durationTrackerFactory; + + /** The core instrumentation. */ + private final S3AInstrumentation instrumentation; + + /** Accessors to statistics for this FS. */ + private final S3AStatisticsContext statisticsContext; + + /** Storage Statistics Bonded to the instrumentation. */ + private final S3AStorageStatistics storageStatistics; + + /** Rate limiter for read operations. */ + private final RateLimiting readRateLimiter; + + /** Rate limiter for write operations. */ + private final RateLimiting writeRateLimiter; + + /** Store context. */ + private final StoreContext storeContext; + + /** Invoker for retry operations. */ + private final Invoker invoker; + + /** Audit span source. */ + private final AuditSpanSource<AuditSpanS3A> auditSpanSource; + + /** Constructor to create S3A store. */ + S3AStoreImpl(StoreContextFactory storeContextFactory, + S3Client s3Client, + DurationTrackerFactory durationTrackerFactory, + S3AInstrumentation instrumentation, + S3AStatisticsContext statisticsContext, + S3AStorageStatistics storageStatistics, + RateLimiting readRateLimiter, + RateLimiting writeRateLimiter, + AuditSpanSource<AuditSpanS3A> auditSpanSource) { + this.storeContextFactory = requireNonNull(storeContextFactory); + this.s3Client = requireNonNull(s3Client); + this.durationTrackerFactory = requireNonNull(durationTrackerFactory); + this.instrumentation = requireNonNull(instrumentation); + this.statisticsContext = requireNonNull(statisticsContext); + this.storageStatistics = requireNonNull(storageStatistics); + this.readRateLimiter = requireNonNull(readRateLimiter); + this.writeRateLimiter = requireNonNull(writeRateLimiter); + this.auditSpanSource = requireNonNull(auditSpanSource); + this.storeContext = requireNonNull(storeContextFactory.createStoreContext()); + this.invoker = storeContext.getInvoker(); + this.bucket = storeContext.getBucket(); + this.requestFactory = storeContext.getRequestFactory(); + } + + /** Acquire write capacity for rate limiting {@inheritDoc}. */ + @Override + public Duration acquireWriteCapacity(final int capacity) { + return writeRateLimiter.acquire(capacity); + } + + /** Acquire read capacity for rate limiting {@inheritDoc}. */ + @Override + public Duration acquireReadCapacity(final int capacity) { + return readRateLimiter.acquire(capacity); + + } + + /** + * Create the store context. Review Comment: lets change to "create a new store context" ########## hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md: ########## @@ -94,6 +94,85 @@ on the client requirements. </property> ``` +## <a name="bulkdelete"></a> Improving delete performance through bulkdelete API. + +For bulk delete API spec refer to File System specification. [BulkDelete](../../../../../../hadoop-common-project/hadoop-common/target/site/filesystem/bulkdelete.html) + +The S3A client exports this API. + +### S3A Implementation of Bulk Delete. +If multi-object delete is enabled (`fs.s3a.multiobjectdelete.enable` = true), as +it is by default, then the page size is limited to that defined in +`fs.s3a.bulk.delete.page.size`, which MUST be less than or equal to 1000. +* The entire list of paths to delete is aggregated into a single bulk delete request, + issued to the store. +* Provided the caller has the correct permissions, every entry in the list + will, if the path references an object, cause that object to be deleted. +* If the path does not reference an object: the path will not be deleted + "This is for deleting objects, not directories" +* No probes for the existence of parent directories will take place; no + parent directory markers will be created. + "If you need parent directories, call mkdir() yourself" +* The list of failed keys listed in the `DeleteObjectsResponse` response + are converted into paths and returned along with their error messages. +* Network and other IO errors are raised as exceptions. + +If multi-object delete is disabled (or the list of size 1) +* A single `DELETE` call is issued +* Any `AccessDeniedException` raised is converted to a result in the error list. +* Any 404 response from a (non-AWS) store will be ignored. +* Network and other IO errors are raised as exceptions. + +Because there are no probes to ensure the call does not overwrite a directory, +or to see if a parentDirectory marker needs to be created, +this API is still faster than issuing a normal `FileSystem.delete(path)` call. + +That is: all the overhead normally undertaken to preserve the Posix System model are omitted. + + +### S3 Scalability and Performance + +Every entry in a bulk delete request counts as one write operation +against AWS S3 storage. +With the default write rate under a prefix on AWS S3 Standard storage +restricted to 3,500 writes/second, it is very easy to overload +the store by issuing a few bulk delete requests simultaneously. + +* If throttling is triggered then all clients interacting with + the store may observe performance issues. +* The write quota applies even for paths which do not exist. +* The S3A client *may* perform rate throttling as well as page size limiting. + +What does that mean? it means that attempting to issue multiple +bulk delete calls in parallel can be counterproductive. + +When overloaded, the S3 store returns a 403 throttle response. +This will trigger it back off and retry of posting the request. +However, the repeated request will still include the same number of objects and +*so generate the same load*. + +This can lead to a pathological situation where the repeated requests will +never be satisfied because the request itself is sufficient to overload the store. +See [HADOOP-16823.Large DeleteObject requests are their own Thundering Herd](https://issues.apache.org/jira/browse/HADOOP-16823) Review Comment: nit: add a space ########## hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDeleteUtils.java: ########## @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.util.Collection; + +import static java.util.Objects.requireNonNull; +import static org.apache.hadoop.util.Preconditions.checkArgument; + +/** + * Utility class for bulk delete operations. + */ +public final class BulkDeleteUtils { Review Comment: move to fs.impl ########## hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md: ########## @@ -94,6 +94,85 @@ on the client requirements. </property> ``` +## <a name="bulkdelete"></a> Improving delete performance through bulkdelete API. + +For bulk delete API spec refer to File System specification. [BulkDelete](../../../../../../hadoop-common-project/hadoop-common/target/site/filesystem/bulkdelete.html) + +The S3A client exports this API. + +### S3A Implementation of Bulk Delete. +If multi-object delete is enabled (`fs.s3a.multiobjectdelete.enable` = true), as +it is by default, then the page size is limited to that defined in +`fs.s3a.bulk.delete.page.size`, which MUST be less than or equal to 1000. +* The entire list of paths to delete is aggregated into a single bulk delete request, + issued to the store. +* Provided the caller has the correct permissions, every entry in the list + will, if the path references an object, cause that object to be deleted. +* If the path does not reference an object: the path will not be deleted + "This is for deleting objects, not directories" +* No probes for the existence of parent directories will take place; no + parent directory markers will be created. + "If you need parent directories, call mkdir() yourself" +* The list of failed keys listed in the `DeleteObjectsResponse` response + are converted into paths and returned along with their error messages. +* Network and other IO errors are raised as exceptions. + +If multi-object delete is disabled (or the list of size 1) +* A single `DELETE` call is issued +* Any `AccessDeniedException` raised is converted to a result in the error list. +* Any 404 response from a (non-AWS) store will be ignored. +* Network and other IO errors are raised as exceptions. + +Because there are no probes to ensure the call does not overwrite a directory, +or to see if a parentDirectory marker needs to be created, +this API is still faster than issuing a normal `FileSystem.delete(path)` call. + +That is: all the overhead normally undertaken to preserve the Posix System model are omitted. + + +### S3 Scalability and Performance + +Every entry in a bulk delete request counts as one write operation +against AWS S3 storage. +With the default write rate under a prefix on AWS S3 Standard storage +restricted to 3,500 writes/second, it is very easy to overload +the store by issuing a few bulk delete requests simultaneously. + +* If throttling is triggered then all clients interacting with + the store may observe performance issues. +* The write quota applies even for paths which do not exist. +* The S3A client *may* perform rate throttling as well as page size limiting. + +What does that mean? it means that attempting to issue multiple +bulk delete calls in parallel can be counterproductive. + +When overloaded, the S3 store returns a 403 throttle response. +This will trigger it back off and retry of posting the request. +However, the repeated request will still include the same number of objects and +*so generate the same load*. + +This can lead to a pathological situation where the repeated requests will +never be satisfied because the request itself is sufficient to overload the store. +See [HADOOP-16823.Large DeleteObject requests are their own Thundering Herd](https://issues.apache.org/jira/browse/HADOOP-16823) +for an example of where this did actually surface in production. + +This is why the default page size of S3A clients is 250 paths, not the store limit of 1000 entries. +It is also why the S3A delete/rename Operations do not attempt to do massive parallel deletions, Review Comment: nit, lower case "operations" ########## hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DefalutBulkDeleteSource.java: ########## @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.io.IOException; + +/** + * Default implementation of {@link BulkDeleteSource}. + */ +public class DefalutBulkDeleteSource implements BulkDeleteSource { Review Comment: 1. typo 2. move to fs.impl module ########## hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DefaultBulkDeleteOperation.java: ########## @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.util.functional.Tuples; + +import static java.util.Objects.requireNonNull; +import static org.apache.hadoop.fs.BulkDeleteUtils.validateBulkDeletePaths; + +/** + * Default implementation of the {@link BulkDelete} interface. Review Comment: nit, javadocs to say "uses delete(path, false) call" ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreBuilder.java: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import software.amazon.awssdk.services.s3.S3Client; + +import org.apache.hadoop.fs.s3a.S3AInstrumentation; +import org.apache.hadoop.fs.s3a.S3AStorageStatistics; +import org.apache.hadoop.fs.s3a.S3AStore; +import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A; +import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext; +import org.apache.hadoop.fs.statistics.DurationTrackerFactory; +import org.apache.hadoop.fs.store.audit.AuditSpanSource; +import org.apache.hadoop.util.RateLimiting; + +/** + * Builder for the S3AStore. + */ +public class S3AStoreBuilder { + + private StoreContextFactory storeContextFactory; + + private S3Client s3Client; + + private DurationTrackerFactory durationTrackerFactory; + + private S3AInstrumentation instrumentation; + + private S3AStatisticsContext statisticsContext; + + private S3AStorageStatistics storageStatistics; + + private RateLimiting readRateLimiter; + + private RateLimiting writeRateLimiter; + + private AuditSpanSource<AuditSpanS3A> auditSpanSource; + + public S3AStoreBuilder withStoreContextFactory( + final StoreContextFactory storeContextFactoryValue) { + this.storeContextFactory = storeContextFactoryValue; + return this; + } + + public S3AStoreBuilder withS3Client( + final S3Client s3ClientValue) { + this.s3Client = s3ClientValue; + return this; + } + + public S3AStoreBuilder withDurationTrackerFactory( + final DurationTrackerFactory durationTrackerFactoryValue) { + this.durationTrackerFactory = durationTrackerFactoryValue; + return this; + } + + public S3AStoreBuilder withInstrumentation( + final S3AInstrumentation instrumentationValue) { + this.instrumentation = instrumentationValue; + return this; + } + + public S3AStoreBuilder withStatisticsContext( + final S3AStatisticsContext statisticsContextValue) { + this.statisticsContext = statisticsContextValue; + return this; + } + + public S3AStoreBuilder withStorageStatistics( + final S3AStorageStatistics storageStatisticsValue) { + this.storageStatistics = storageStatisticsValue; + return this; + } + + public S3AStoreBuilder withReadRateLimiter( + final RateLimiting readRateLimiterValue) { + this.readRateLimiter = readRateLimiterValue; + return this; + } + + public S3AStoreBuilder withWriteRateLimiter( + final RateLimiting writeRateLimiterValue) { + this.writeRateLimiter = writeRateLimiterValue; + return this; + } + + public S3AStoreBuilder withAuditSpanSource( + final AuditSpanSource<AuditSpanS3A> auditSpanSourceValue) { + this.auditSpanSource = auditSpanSourceValue; + return this; + } + + public S3AStore build() { + return new S3AStoreImpl(storeContextFactory, s3Client, durationTrackerFactory, instrumentation, + statisticsContext, storageStatistics, readRateLimiter, writeRateLimiter, auditSpanSource); + } +} Review Comment: nit, newline ########## hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractBulkDeleteTest.java: ########## @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.assertj.core.api.Assertions; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.CommonPathCapabilities; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.wrappedio.WrappedIO; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; +import static org.apache.hadoop.fs.contract.ContractTestUtils.touch; +import static org.apache.hadoop.io.wrappedio.WrappedIO.bulkDelete; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Contract tests for bulk delete operation. + */ +public abstract class AbstractContractBulkDeleteTest extends AbstractFSContractTestBase { + + private static final Logger LOG = + LoggerFactory.getLogger(AbstractContractBulkDeleteTest.class); + + /** + * Page size for bulk delete. This is calculated based + * on the store implementation. + */ + protected int pageSize; + + /** + * Base path for the bulk delete tests. + * All the paths to be deleted should be under this base path. + */ + protected Path basePath; + + /** + * Test file system. + */ + protected FileSystem fs; + + @Before + public void setUp() throws Exception { + fs = getFileSystem(); + basePath = path(getClass().getName()); + pageSize = WrappedIO.bulkDeletePageSize(getFileSystem(), basePath); + fs.mkdirs(basePath); + } + + public Path getBasePath() { + return basePath; + } + + protected int getExpectedPageSize() { + return 1; + } + + /** + * Validate the page size for bulk delete operation. Different stores can have different + * implementations for bulk delete operation thus different page size. + */ + @Test + public void validatePageSize() throws Exception { + Assertions.assertThat(pageSize) + .describedAs("Page size should be 1 by default for all stores") + .isEqualTo(getExpectedPageSize()); + } + + @Test + public void testPathsSizeEqualsPageSizePrecondition() throws Exception { + List<Path> listOfPaths = createListOfPaths(pageSize, basePath); + // Bulk delete call should pass with no exception. + bulkDelete(getFileSystem(), basePath, listOfPaths); + } + + @Test + public void testPathsSizeGreaterThanPageSizePrecondition() throws Exception { + List<Path> listOfPaths = createListOfPaths(pageSize + 1, basePath); + intercept(IllegalArgumentException.class, + () -> bulkDelete(getFileSystem(), basePath, listOfPaths)); + } + + @Test + public void testPathsSizeLessThanPageSizePrecondition() throws Exception { + List<Path> listOfPaths = createListOfPaths(pageSize - 1, basePath); + // Bulk delete call should pass with no exception. + bulkDelete(getFileSystem(), basePath, listOfPaths); + } + + @Test + public void testBulkDeleteSuccessful() throws Exception { + List<Path> listOfPaths = createListOfPaths(pageSize, basePath); + for (Path path : listOfPaths) { + touch(fs, path); + } + FileStatus[] fileStatuses = fs.listStatus(basePath); + Assertions.assertThat(fileStatuses) + .describedAs("File count after create") + .hasSize(pageSize); + assertSuccessfulBulkDelete( + bulkDelete(getFileSystem(), basePath, listOfPaths)); + FileStatus[] fileStatusesAfterDelete = fs.listStatus(basePath); + Assertions.assertThat(fileStatusesAfterDelete) + .describedAs("File statuses should be empty after delete") + .isEmpty(); + } + + @Test + public void validatePathCapabilityDeclared() throws Exception { + Assertions.assertThat(fs.hasPathCapability(basePath, CommonPathCapabilities.BULK_DELETE)) + .describedAs("Path capability BULK_DELETE should be declared") + .isTrue(); + } + + @Test + public void testDeletePathsNotUnderBase() throws Exception { + List<Path> paths = new ArrayList<>(); + Path pathNotUnderBase = path("not-under-base"); + paths.add(pathNotUnderBase); + // Should fail as path is not under the base path. + intercept(IllegalArgumentException.class, + () -> bulkDelete(getFileSystem(), basePath, paths)); + } + + @Test + public void testDeletePathsNotAbsolute() throws Exception { + List<Path> paths = new ArrayList<>(); + Path pathNotAbsolute = new Path("not-absolute"); + paths.add(pathNotAbsolute); + // Should fail as path is not absolute. Review Comment: can you pull these and similar ones below up to javadocs ########## hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDeleteSource.java: ########## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Interface for bulk deletion. + * Filesystems which support bulk deletion should implement this interface + * and MUST also declare their support in the path capability + * {@link CommonPathCapabilities#BULK_DELETE}. + * Exporting the interface does not guarantee that the operation is supported; + * returning a {@link BulkDelete} object from the call {@link #createBulkDelete(Path)} + * is. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public interface BulkDeleteSource { Review Comment: I think the base FileSystem class should implement this, with `createBulkDelete(Path path)` returning an instance of DefaultBulkDeleteOperation bonded to that FS. that way, automatic implementation. If we can do that then * no need for DefaultBulkDeleteSource * we could actually cut the `default` implementation here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org