[ 
https://issues.apache.org/jira/browse/HADOOP-19385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17916868#comment-17916868
 ] 

ASF GitHub Bot commented on HADOOP-19385:
-----------------------------------------

cnauroth commented on code in PR #7316:
URL: https://github.com/apache/hadoop/pull/7316#discussion_r1929391921


##########
hadoop-tools/hadoop-aws/src/test/java17/org/apache/fs/test/formats/AbstractIcebergDeleteTest.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.fs.test.formats;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonPathCapabilities;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
+import org.apache.hadoop.io.wrappedio.impl.DynamicWrappedIO;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+
+import static 
org.apache.hadoop.fs.contract.ContractTestUtils.assertSuccessfulBulkDelete;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
+import static org.apache.hadoop.io.wrappedio.WrappedIO.bulkDelete_delete;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Contract tests for iceberg bulk delete operation,
+ * verifyying
+ */
+public abstract class AbstractIcebergDeleteTest extends 
AbstractFSContractTestBase {
+
+  private static final Logger LOG =
+          LoggerFactory.getLogger(AbstractIcebergDeleteTest.class);
+
+  private static final String DELETE_FILE_PARALLELISM = 
"iceberg.hadoop.delete-file-parallelism";
+
+  /** Is bulk delete enabled on hadoop runtimes with API support: {@value}. */
+  public static final String ICEBERG_BULK_DELETE_ENABLED = 
"iceberg.hadoop.bulk.delete.enabled";
+
+  /**
+   * Page size for bulk delete. This is calculated based
+   * on the store implementation.
+   */
+  protected int pageSize;
+
+  /**
+   * Base path for the bulk delete tests.
+   * All the paths to be deleted should be under this base path.
+   */
+  protected Path basePath;
+
+  /**
+   * Reflection support.
+   */
+  private DynamicWrappedIO dynamicWrappedIO;
+
+  /**
+   * Create a configuration with the iceberg settings
+   * added.
+   * @return a configuration for subclasses to extend
+   */
+
+  @Override
+  protected Configuration createConfiguration() {
+    final Configuration conf = super.createConfiguration();
+    return conf;
+  }
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    FileSystem fs = getFileSystem();
+    basePath = path(getClass().getName());
+    dynamicWrappedIO = new DynamicWrappedIO();
+    pageSize = dynamicWrappedIO.bulkDelete_pageSize(fs, basePath);
+    fs.mkdirs(basePath);

Review Comment:
   Assert that `mkdirs` succeeds (returns `true`)?



##########
hadoop-tools/hadoop-aws/src/test/java17/org/apache/hadoop/fs/contract/s3a/ITestIcebergBulkDelete.java:
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract.s3a;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.fs.test.formats.AbstractIcebergDeleteTest;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Lists;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.io.BulkDeletionFailureException;
+
+import static 
org.apache.hadoop.fs.contract.ContractTestUtils.assertPathsDoNotExist;
+import static 
org.apache.hadoop.fs.contract.ContractTestUtils.assertSuccessfulBulkDelete;
+import static 
org.apache.hadoop.fs.contract.ContractTestUtils.getFileStatusOrNull;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
+import static org.apache.hadoop.fs.s3a.Constants.BULK_DELETE_PAGE_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.ENABLE_MULTI_DELETE;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_PERFORMANCE_FLAGS;
+import static org.apache.hadoop.fs.s3a.Constants.PERFORMANCE_FLAGS;
+import static 
org.apache.hadoop.fs.s3a.S3ATestConstants.FS_S3A_IMPL_DISABLE_CACHE;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.createFiles;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotEnabled;
+import static org.apache.hadoop.fs.s3a.S3AUtils.propagateBucketOptions;
+import static org.apache.hadoop.io.wrappedio.WrappedIO.bulkDelete_delete;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Test Iceberg Bulk Delete API.
+ * <p>
+ * Parameterized on Iceberg bulk delete enabled/disabled and
+ * s3a multipart delete enabled/disabled.
+ */
+@RunWith(Parameterized.class)
+public class ITestIcebergBulkDelete extends AbstractIcebergDeleteTest {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ITestIcebergBulkDelete.class);
+
+  /**
+   * Parallelism when using the classic multi-thread bulk delete.
+   */
+  private static final String ICEBERG_DELETE_FILE_PARALLELISM =
+      "iceberg.hadoop.delete-file-parallelism";
+
+  /** Is bulk delete enabled on hadoop runtimes with API support: {@value}. */
+  public static final String ICEBERG_BULK_DELETE_ENABLED = 
"iceberg.hadoop.bulk.delete.enabled";
+
+  private static final int DELETE_PAGE_SIZE = 3;
+
+  private static final int DELETE_FILE_COUNT = 7;
+
+  @Parameterized.Parameters(name = "multiobjectdelete-{0}-usebulk-{1}")
+  public static Iterable<Object[]> enableMultiObjectDelete() {
+    return Arrays.asList(new Object[][]{
+        {true, true},
+        {true, false},
+        {false, true},
+        {false, false}
+    });
+  }
+
+  /**
+   * Enable s3a multi object delete.
+   */
+  private final boolean enableMultiObjectDelete;
+
+  /**
+   * Enable bulk delete in iceberg.
+   */
+  private final boolean useBulk;
+
+  public ITestIcebergBulkDelete(boolean enableMultiObjectDelete, final boolean 
useBulk) {
+    this.enableMultiObjectDelete = enableMultiObjectDelete;
+    this.useBulk = useBulk;
+  }
+
+  @Override
+  public void setup() throws Exception {
+    // close all filesystems.
+    FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser());
+
+    // the create the single new one
+    super.setup();
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    conf = propagateBucketOptions(conf, getTestBucketName(conf));
+    removeBaseAndBucketOverrides(conf,
+        BULK_DELETE_PAGE_SIZE);
+    // turn the caching on else every call refreshes the cache
+    conf.setBoolean(FS_S3A_IMPL_DISABLE_CACHE, false);
+    conf.setInt(BULK_DELETE_PAGE_SIZE, DELETE_PAGE_SIZE);
+
+    // skip this test run if multi-delete is explicitly disabled;
+    // this is needed to test against third party stores
+    // which do not support it.
+    if (enableMultiObjectDelete) {
+      skipIfNotEnabled(conf, ENABLE_MULTI_DELETE, "multi object delete is 
disabled");
+    }
+    conf.setBoolean(ENABLE_MULTI_DELETE, enableMultiObjectDelete);
+    conf.setBoolean(ICEBERG_BULK_DELETE_ENABLED, useBulk);
+    conf.setInt(ICEBERG_DELETE_FILE_PARALLELISM, 5);
+    // speed up file/dir creation
+    conf.set(FS_S3A_PERFORMANCE_FLAGS, PERFORMANCE_FLAGS);
+    return conf;
+  }
+
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new S3AContract(createConfiguration());
+  }
+
+  @Override
+  protected int getExpectedPageSize() {
+    return enableMultiObjectDelete
+        ? 1
+        : DELETE_PAGE_SIZE;
+  }
+
+  /**
+   * Create file IO; includes an assert that bulk delete is enabled.
+   * @return a file iO
+   */
+  private HadoopFileIO createFileIO() {
+    final Configuration conf = getFileSystem().getConf();
+
+    final HadoopFileIO fileIO = new HadoopFileIO(conf);
+    // assert that bulk delete loaded.
+    Assertions.assertThat(fileIO.isBulkDeleteApiUsed())
+        .describedAs("is HadoopFileIO able to load Hadoop bulk delete")
+        .isEqualTo(useBulk);
+    return fileIO;
+  }
+
+  /**
+   * Delete a single file using the bulk delete API.
+   */
+  @Test
+  public void testDeleteSingleFile() throws Throwable {
+    Path path = new Path(methodPath(), "../single");
+    try (HadoopFileIO fileIO = createFileIO()) {
+      final List<String> filename = stringList(path);
+      LOG.info("Deleting empty path");
+      fileIO.deleteFiles(filename);
+      // now one file
+      final FileSystem fs = getFileSystem();
+      touch(fs, path);
+      LOG.info("Deleting file at {}", filename);
+      fileIO.deleteFiles(filename);
+      assertPathsDoNotExist(fs, "should have been deleted", path);
+    }
+  }
+
+  /**
+   * A directory is not deleted through the bulk delete API,
+   * but does not report a failure.
+   * The classic invocation mechanism reports a failure.
+   */
+  @Test
+  public void testDeleteDirectory() throws Throwable {
+    Path path = methodPath();
+
+    try (HadoopFileIO fileIO = createFileIO()) {
+      final List<String> filename = stringList(path);
+
+      // create a directory and a child underneath
+      Path child = new Path(path, "child");
+      final FileSystem fs = getFileSystem();
+      fs.mkdirs(path);
+      touch(fs, child);
+
+      LOG.info("Deleting path to directory");
+      if (useBulk) {
+        fileIO.deleteFiles(filename);
+      } else {
+        final BulkDeletionFailureException ex =
+            intercept(BulkDeletionFailureException.class, () ->
+                fileIO.deleteFiles(filename));
+        Assertions.assertThat(ex.numberFailedObjects())
+            .describedAs("Failure count in %s", ex)
+            .isEqualTo(1);
+      }
+      // Reported failure or not, the directory is still found
+      assertPathExists("directory was not deleted", path);
+    }
+  }
+
+  /**
+   * A directory is not deleted through the bulk delete API,
+   * it is through the classic single file delete.
+   * The assertions match this behavior.
+   * <p>
+   * Note that the semantics of FileSystem.delete(path, nonrecursive)
+   * have special handling of deleting an empty directory, where
+   * it is allowed (as with unix cli rm), so a child file
+   * is created to force stricter semantics.
+   */
+  @Test
+  public void testDeleteDirectoryDirect() throws Throwable {
+    //Path path = new Path(methodPath(), "../single");
+    Path path = methodPath();
+    try (HadoopFileIO fileIO = createFileIO()) {
+
+      // create a directory and a child underneath
+      Path child = new Path(path, "child");
+      final FileSystem fs = getFileSystem();
+
+      fs.mkdirs(path);
+      touch(fs, child);
+
+      LOG.info("Deleting path to directory via deleteFile");
+      intercept(RuntimeIOException.class, () ->
+      {
+        final String s = toString(path);
+        fileIO.deleteFile(s);
+        final FileStatus st = getFileStatusOrNull(fs, path);
+        return String.format("Expected failure deleting %s but none raised. 
Path status: %s",
+            path, st);
+      });
+    }
+  }
+
+  @Test
+  public void testDeleteManyFiles() throws Throwable {
+    Path path = methodPath();
+    final FileSystem fs = getFileSystem();
+    final List<Path> files = createFiles(fs, path, 1, DELETE_FILE_COUNT, 0);
+    try (HadoopFileIO fileIO = createFileIO()) {
+      fileIO.deleteFiles(stringList(files));
+      for (Path p : files) {
+        assertPathDoesNotExist("expected deletion", p);
+      }
+    }
+  }
+
+  /**
+   * Use a more complex filename.
+   * This validates that any conversions to URI/string
+   * when passing to an object store is correct.
+   */
+  @Test
+  public void testDeleteComplexFilename() throws Exception {
+    Path path = new Path(basePath, "child[=comple]x");
+    List<Path> paths = new ArrayList<>();
+    paths.add(path);
+    // bulk delete call doesn't verify if a path exists or not before deleting.
+    assertSuccessfulBulkDelete(bulkDelete_delete(getFileSystem(), basePath, 
paths));
+  }
+
+  public static List<String> stringList(List<Path> files) {
+    return files.stream().map(p -> toString(p)).collect(Collectors.toList());

Review Comment:
   Can this be reduced to:
   
   ```
       return files.stream().map(Path::toString).collect(Collectors.toList());
   ```
   
   ...and then remove the static `toString` method below? It just seems to turn 
around and call `toString()` on the object.



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java:
##########
@@ -1825,6 +1826,48 @@ public static long totalReadSize(final List<FileRange> 
fileRanges) {
         .sum();
   }
 
+  /**
+   * Assert on returned entries after bulk delete operation.
+   * Entries should be empty after successful delete.
+   */
+  public static void assertSuccessfulBulkDelete(List<Map.Entry<Path, String>> 
entries) {
+    Assertions.assertThat(entries)
+            .describedAs("Bulk delete failed, " +
+                    "return entries should be empty after successful delete")
+            .isEmpty();
+  }
+
+  /**
+   * Get a file status value or, if the path doesn't exist, return null.

Review Comment:
   Perhaps an opportunity to return `Optional<FileStatus>`?





> S3A: add a file-format-parsing module for testing format parsing
> ----------------------------------------------------------------
>
>                 Key: HADOOP-19385
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19385
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/azure, fs/s3
>    Affects Versions: 3.4.2
>            Reporter: Steve Loughran
>            Priority: Major
>              Labels: pull-request-available
>
> Create a cloud-storage/format-parsing module declaring various file formats 
> as dependencies (parquet, iceberg, orc) purely for integration/regression 
> testing store support for them.
> h2. Parquet
> for parquet reading we'd want
> * parquet lib
> * samples of well formed files
> * samples of malformed files.
> Test runs would upload the files then open then.
> h2. Iceberg
> Verify bulk delete through iceberg FileIO api. 
> *Update: Iceberg needs java17*
> It can't be merged until hadoop trunk goes there. parquet stuff we can put in 
> earlier and backport
> does let me set up the module though



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to