[ 
https://issues.apache.org/jira/browse/HADOOP-19217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18049918#comment-18049918
 ] 

ASF GitHub Bot commented on HADOOP-19217:
-----------------------------------------

steveloughran commented on code in PR #8063:
URL: https://github.com/apache/hadoop/pull/8063#discussion_r2662533375


##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/TrashPolicy.java:
##########
@@ -146,18 +156,19 @@ public static TrashPolicy getInstance(Configuration conf, 
FileSystem fs, Path ho
   }
 
   /**
-   * Get an instance of the configured TrashPolicy based on the value
-   * of the configuration parameter fs.trash.classname.
+   * Get an instance of the TrashPolicy associated with the FileSystem 
implementation of
+   * {@link FileSystem#getTrashPolicy(Configuration)}. The configuration 
passed might be used
+   * by the FileSystem implementation to pick the {@link TrashPolicy} 
implementation. The default
+   * {@link FileSystem#getTrashPolicy(Configuration)} checks 
fs.trash.classname to pick the
+   * {@link TrashPolicy} implementation.
    *
    * @param conf the configuration to be used
    * @param fs the file system to be used
    * @return an instance of TrashPolicy
    */
   public static TrashPolicy getInstance(Configuration conf, FileSystem fs) {
-    Class<? extends TrashPolicy> trashClass = conf.getClass(
-        "fs.trash.classname", TrashPolicyDefault.class, TrashPolicy.class);
-    TrashPolicy trash = ReflectionUtils.newInstance(trashClass, conf);
-    trash.initialize(conf, fs); // initialize TrashPolicy
-    return trash;
+    TrashPolicy trashPolicy = fs.getTrashPolicy(conf);

Review Comment:
   for this (legacy) static call, pass in a path of "/" to the getTrashPolicy() 
call



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java:
##########
@@ -3429,6 +3429,29 @@ public Collection<? extends BlockStoragePolicySpi> 
getAllStoragePolicies()
         + " doesn't support getAllStoragePolicies");
   }
 
+  /**
+   * Get the trash policy implementation used by this FileSystem. This trash 
policy
+   * is used by classes of {@link Trash} to implement the trash behavior.
+   * <p>
+   * FileSystem implementation can consider overriding this method to handle
+   * situation where a single FileSystem client shares a configuration, but
+   * each FileSystem scheme requires a distinct TrashPolicy implementation.
+   *
+   * @param conf configuration which can be used to choose the TrashPolicy
+   *             implementation.
+   * @return TrashPolicy implementation by this filesystem.
+   *         The default implementation returns the configured TrashPolicy
+   *         based on the value of the configuration parameter 
fs.trash.classname
+   *         of the passed configuration.
+   */
+  @InterfaceAudience.Public
+  @InterfaceStability.Unstable
+  public TrashPolicy getTrashPolicy(Configuration conf) {
+    Class<? extends TrashPolicy> trashClass = conf.getClass(
+        "fs.trash.classname", TrashPolicyDefault.class, TrashPolicy.class);

Review Comment:
   * make this a constant somewhere.
   * log at debug the trash policy "default filesysetm trash policy loaded 
policy "<classname>



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractTrashTest.java:
##########
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.fs.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.Trash;
+import org.apache.hadoop.fs.TrashPolicy;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_CHECKPOINT_INTERVAL_KEY;
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
+
+/**
+ * Test the {@link TrashPolicy} returned by {@link FileSystem#getTrashPolicy}
+ * results in a consistent trash behavior.
+ * <p>
+ *   Consistent trash behavior means that invoking the {@link TrashPolicy} 
methods in the
+ *   following order should not result in unexpected results such as files in 
trash that
+ *   will never be deleted by trash mechanism.
+ *   <ol>
+ *   <li>
+ *     {@link TrashPolicy#getDeletionInterval()} should return 0 before
+ *     {@link TrashPolicy#initialize(Configuration, FileSystem)} is invoked.
+ *     The deletion interval should not return negative value. Zero value 
implies
+ *     that trash is disabled, which means {@link TrashPolicy#isEnabled()} 
should
+ *     return false.
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#initialize(Configuration, FileSystem)} should be 
implemented
+ *     and ensure that the subsequent {@link TrashPolicy} operations should 
work properly
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#isEnabled()} should return true after
+ *     {@link TrashPolicy#initialize(Configuration, FileSystem)} is invoked and
+ *     initialize the deletion interval to positive value. {@link 
TrashPolicy#isEnabled()}
+ *     should remain false if the {@link TrashPolicy#getDeletionInterval()}} 
returns 0 even after
+ *     {@link TrashPolicy#initialize(Configuration, FileSystem)} has been 
invoked.
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#moveToTrash(Path)} should move a file or directory 
to the
+ *     current trash directory defined {@link 
TrashPolicy#getCurrentTrashDir(Path)}
+ *     if it's not already in the trash. This implies that
+ *     the {@link FileSystem#exists(Path)} should return false for the 
original path, but
+ *     should return true for the current trash directory.
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#moveToTrash(Path)} should return false if {@link 
TrashPolicy#isEnabled()} is false or
+ *     the path is already under {@link FileSystem#getTrashRoot(Path)}. There 
should not be any side
+ *     effect when {@link TrashPolicy#moveToTrash(Path)} returns false.
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#createCheckpoint()} should create rename the current 
trash directory to
+ *     another trash directory which is not equal to {@link 
TrashPolicy#getCurrentTrashDir(Path)}.
+ *     {@link TrashPolicy#createCheckpoint()} is a no-op if there is no 
current trash directory.
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#deleteCheckpoint()} should cleanup the all the 
current
+ *     and checkpoint directories under {@link 
FileSystem#getTrashRoots(boolean)} created before
+ *     {@link TrashPolicy#getDeletionInterval()} minutes ago.
+ *     Note that the current trash directory {@link 
TrashPolicy#getCurrentTrashDir()} should not be deleted.
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#deleteCheckpointsImmediately()} should cleanup the 
checkpoint directories under
+ *     {@link FileSystem#getTrashRoots(boolean)} regardless of the checkpoint 
timestamp.
+ *     Note that the current trash directory {@link 
TrashPolicy#getCurrentTrashDir()} should not be deleted.
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#getEmptier()} returns a runnable that will empty the 
trash.
+ *     The effective trash emptier interval should be [0, {@link 
TrashPolicy#getDeletionInterval()}].
+ *     Zero interval means that {@link Runnable#run()} is a no-op and returns 
immediately.
+ *     Non-zero trash emptier interval means that the {@link Runnable#run()} 
keeps running for
+ *     each interval (unless it is interrupted). For each interval, the trash 
emptier carry out the
+ *     following operations:
+ *     <ol>
+ *       It checks all the trash root directories through {@link 
FileSystem#getTrashRoots(boolean)} for all users.
+ *     </ol>
+ *     <ol>
+ *       For each trash root directory, it deletes the trash checkpoint 
directory with checkpoint time older than
+ *       {@link TrashPolicy#getDeletionInterval()}. Afterward, it creates a 
new trash checkpoint through
+ *       {@link TrashPolicy#createCheckpoint()}. Note that existing 
checkpoints which has not expired, will not
+ *       have any change.
+ *     </ol>
+ *   </li>
+ *   </ol>
+ * </p>
+ */
+public abstract class AbstractContractTrashTest extends 
AbstractFSContractTestBase {
+
+  @BeforeEach
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    // Enable trash with 12 seconds deletes and 6 seconds checkpoints
+    conf.set(FS_TRASH_INTERVAL_KEY, "0.2"); // 12 seconds
+    conf.set(FS_TRASH_CHECKPOINT_INTERVAL_KEY, "0.1"); // 6 seconds
+    return conf;
+  }
+
+  @AfterEach
+  @Override
+  public void teardown() throws Exception {
+    final FileSystem fs = getFileSystem();

Review Comment:
   add a try/catch here so if there's a failure in the test and teardown, the 
teardown failure doesn't hide the test failure



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractTrashTest.java:
##########
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.fs.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.Trash;
+import org.apache.hadoop.fs.TrashPolicy;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_CHECKPOINT_INTERVAL_KEY;
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
+
+/**
+ * Test the {@link TrashPolicy} returned by {@link FileSystem#getTrashPolicy}
+ * results in a consistent trash behavior.
+ * <p>
+ *   Consistent trash behavior means that invoking the {@link TrashPolicy} 
methods in the
+ *   following order should not result in unexpected results such as files in 
trash that
+ *   will never be deleted by trash mechanism.
+ *   <ol>
+ *   <li>
+ *     {@link TrashPolicy#getDeletionInterval()} should return 0 before
+ *     {@link TrashPolicy#initialize(Configuration, FileSystem)} is invoked.
+ *     The deletion interval should not return negative value. Zero value 
implies
+ *     that trash is disabled, which means {@link TrashPolicy#isEnabled()} 
should
+ *     return false.
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#initialize(Configuration, FileSystem)} should be 
implemented
+ *     and ensure that the subsequent {@link TrashPolicy} operations should 
work properly
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#isEnabled()} should return true after
+ *     {@link TrashPolicy#initialize(Configuration, FileSystem)} is invoked and
+ *     initialize the deletion interval to positive value. {@link 
TrashPolicy#isEnabled()}
+ *     should remain false if the {@link TrashPolicy#getDeletionInterval()}} 
returns 0 even after
+ *     {@link TrashPolicy#initialize(Configuration, FileSystem)} has been 
invoked.
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#moveToTrash(Path)} should move a file or directory 
to the
+ *     current trash directory defined {@link 
TrashPolicy#getCurrentTrashDir(Path)}
+ *     if it's not already in the trash. This implies that
+ *     the {@link FileSystem#exists(Path)} should return false for the 
original path, but
+ *     should return true for the current trash directory.
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#moveToTrash(Path)} should return false if {@link 
TrashPolicy#isEnabled()} is false or
+ *     the path is already under {@link FileSystem#getTrashRoot(Path)}. There 
should not be any side
+ *     effect when {@link TrashPolicy#moveToTrash(Path)} returns false.
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#createCheckpoint()} should create rename the current 
trash directory to
+ *     another trash directory which is not equal to {@link 
TrashPolicy#getCurrentTrashDir(Path)}.
+ *     {@link TrashPolicy#createCheckpoint()} is a no-op if there is no 
current trash directory.
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#deleteCheckpoint()} should cleanup the all the 
current
+ *     and checkpoint directories under {@link 
FileSystem#getTrashRoots(boolean)} created before
+ *     {@link TrashPolicy#getDeletionInterval()} minutes ago.
+ *     Note that the current trash directory {@link 
TrashPolicy#getCurrentTrashDir()} should not be deleted.
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#deleteCheckpointsImmediately()} should cleanup the 
checkpoint directories under
+ *     {@link FileSystem#getTrashRoots(boolean)} regardless of the checkpoint 
timestamp.
+ *     Note that the current trash directory {@link 
TrashPolicy#getCurrentTrashDir()} should not be deleted.
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#getEmptier()} returns a runnable that will empty the 
trash.
+ *     The effective trash emptier interval should be [0, {@link 
TrashPolicy#getDeletionInterval()}].
+ *     Zero interval means that {@link Runnable#run()} is a no-op and returns 
immediately.
+ *     Non-zero trash emptier interval means that the {@link Runnable#run()} 
keeps running for
+ *     each interval (unless it is interrupted). For each interval, the trash 
emptier carry out the
+ *     following operations:
+ *     <ol>
+ *       It checks all the trash root directories through {@link 
FileSystem#getTrashRoots(boolean)} for all users.
+ *     </ol>
+ *     <ol>
+ *       For each trash root directory, it deletes the trash checkpoint 
directory with checkpoint time older than
+ *       {@link TrashPolicy#getDeletionInterval()}. Afterward, it creates a 
new trash checkpoint through
+ *       {@link TrashPolicy#createCheckpoint()}. Note that existing 
checkpoints which has not expired, will not
+ *       have any change.
+ *     </ol>
+ *   </li>
+ *   </ol>
+ * </p>
+ */
+public abstract class AbstractContractTrashTest extends 
AbstractFSContractTestBase {
+
+  @BeforeEach
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    // Enable trash with 12 seconds deletes and 6 seconds checkpoints
+    conf.set(FS_TRASH_INTERVAL_KEY, "0.2"); // 12 seconds
+    conf.set(FS_TRASH_CHECKPOINT_INTERVAL_KEY, "0.1"); // 6 seconds
+    return conf;
+  }
+
+  @AfterEach
+  @Override
+  public void teardown() throws Exception {
+    final FileSystem fs = getFileSystem();
+    Collection<FileStatus> trashRoots = fs.getTrashRoots(true);
+    for (FileStatus trashRoot : trashRoots) {
+      fs.delete(trashRoot.getPath(), true);
+    }
+    super.teardown();
+  }
+
+  @Test
+  public void testTrashPolicy() throws Throwable {
+    final FileSystem fs = getFileSystem();
+
+    // TrashPolicy needs to be initialized with non-zero deletion interval 
before
+    // TrashPolicy#isEnabled returns true
+    final TrashPolicy trashPolicy = fs.getTrashPolicy(getContract().getConf());
+    assertFalse(trashPolicy.isEnabled());
+    assertEquals(0, trashPolicy.getDeletionInterval());
+    assertFalse(trashPolicy.moveToTrash(new Path("randomFile")));
+    trashPolicy.initialize(getContract().getConf(), fs);
+    assertTrue(trashPolicy.isEnabled());
+    assertTrue(trashPolicy.getDeletionInterval() > 0);
+
+    // Check that the current directory is still empty even if checkpoints 
operation is run
+    assertPathDoesNotExist("trash current directory should not exist before 
moveToTrash",
+        trashPolicy.getCurrentTrashDir());
+    trashPolicy.createCheckpoint();
+    assertPathDoesNotExist("trash current directory should not exist before 
moveToTrash",
+        trashPolicy.getCurrentTrashDir());
+    trashPolicy.deleteCheckpoint();
+    assertPathDoesNotExist("trash current directory should not exist before 
moveToTrash",
+        trashPolicy.getCurrentTrashDir());
+    trashPolicy.deleteCheckpointsImmediately();
+    assertPathDoesNotExist("trash current directory should not exist before 
moveToTrash",
+        trashPolicy.getCurrentTrashDir());
+
+    // TrashPolicy#moveToTrash should move the file to the current trash 
directory
+    Path base = methodPath();
+    mkdirs(base);
+    Path fileToDelete = new Path(base, "testFile");
+    byte[] data = ContractTestUtils.dataset(256, 'a', 'z');
+    ContractTestUtils.writeDataset(fs, fileToDelete, data, data.length, 1024 * 
1024, false);
+
+    assertTrue(trashPolicy.moveToTrash(fileToDelete));
+    assertPathExists("trash current directory should exist after moveToTrash",
+        trashPolicy.getCurrentTrashDir());
+    Path expectedCurrentTrashPath = 
Path.mergePaths(trashPolicy.getCurrentTrashDir(fileToDelete), fileToDelete);;
+    ContractTestUtils.verifyFileContents(fs, expectedCurrentTrashPath, data);
+    // Calling TrashPolicy#moveToTrash on the key in path should return false
+    // and the file remains unchanged
+    assertFalse(trashPolicy.moveToTrash(expectedCurrentTrashPath));
+    ContractTestUtils.verifyFileContents(fs, expectedCurrentTrashPath, data);
+
+    // Calling TrashPolicy#deleteCheckpoint or 
TrashPolicy#deleteCheckpointImmediately has no effect on the
+    // current trash directory.
+    trashPolicy.deleteCheckpoint();
+    trashPolicy.deleteCheckpointsImmediately();
+    ContractTestUtils.verifyFileContents(fs, expectedCurrentTrashPath, data);
+
+    // TrashPolicy#createCheckpoint rename the current trash directory to a 
new directory
+    trashPolicy.createCheckpoint();
+    assertPathDoesNotExist("trash current directory should not exist after 
checkpoint",
+        trashPolicy.getCurrentTrashDir(fileToDelete));
+    assertPathDoesNotExist("the path under current trash directory should not 
exist after checkpoint",
+        expectedCurrentTrashPath);
+    FileStatus[] trashRootChildren = ContractTestUtils.listChildren(fs, 
fs.getTrashRoot(fileToDelete));
+    assertEquals(1, trashRootChildren.length);
+    FileStatus trashCheckpointDir = trashRootChildren[0];
+    Path expectedCheckpointTrashPath = 
Path.mergePaths(trashCheckpointDir.getPath(), fileToDelete);
+    ContractTestUtils.verifyFileContents(fs, expectedCheckpointTrashPath, 
data);
+
+    // TrashPolicy#deleteCheckpoint
+    Thread.sleep(12000); // This should be the time set as deletion interval
+    trashPolicy.deleteCheckpoint();
+    assertPathDoesNotExist("the path under checkpoint directory should be 
deleted",
+        expectedCheckpointTrashPath);
+    trashRootChildren = ContractTestUtils.listChildren(fs, 
fs.getTrashRoot(fileToDelete));
+    assertEquals(0, trashRootChildren.length);
+  }
+
+  @Test
+  public void testEmptier() throws Throwable {
+    // Adapted from TestTrash#testTrashEmptier.
+    final FileSystem fs = getFileSystem();
+
+    // Start Emptier in background
+    final TrashPolicy trashPolicy = fs.getTrashPolicy(getContract().getConf());
+    trashPolicy.initialize(getContract().getConf(), fs);
+
+    Runnable emptier = trashPolicy.getEmptier();
+    Thread emptierThread = new Thread(emptier);
+    emptierThread.start();
+
+    // First create a new directory with mkdirs
+    Path base = methodPath();
+    mkdirs(base);
+    int fileIndex = 0;
+    Set<String> checkpoints = new HashSet<>();
+    while (true) {
+      // Create a file with a new name
+      Path myFile = new Path(base, "myFile" + fileIndex);
+      ContractTestUtils.writeTextFile(fs, myFile, "file" + fileIndex, false);
+      fileIndex++;
+
+      // Move the files to trash
+      assertTrue(trashPolicy.moveToTrash(myFile));
+
+      Path trashDir = trashPolicy.getCurrentTrashDir(myFile);
+      FileStatus files[] = fs.listStatus(trashDir.getParent());
+      // Scan files in .Trash and add them to set of checkpoints
+      for (FileStatus file : files) {
+        String fileName = file.getPath().getName();
+        checkpoints.add(fileName);
+      }
+      // If checkpoints has 4 objects it is Current + 3 checkpoint directories
+      if (checkpoints.size() == 4) {
+        // The actual contents should be smaller since the last checkpoint
+        // should've been deleted and Current might not have been recreated yet
+        assertTrue(checkpoints.size() > files.length);
+        break;
+      }
+      Thread.sleep(5000);
+    }
+    emptierThread.interrupt();
+    emptierThread.join();
+  }
+
+  @Test
+  public void testTrash() throws Throwable {
+    // Adapted from TestTrash#testTrash. There are some tests that are 
excluded,
+    // such as checkpoint format tests since the trash does not specify the 
trash
+    // checkpoint requirements
+    final FileSystem fs = getFileSystem();
+    Trash trash = new Trash(fs, getContract().getConf());
+
+    // First create a new directory with mkdirs
+    Path baseDir = methodPath();
+    mkdirs(baseDir);
+
+    // Create a file in that directory
+    Path myFile = new Path(baseDir, "myFile");
+    ContractTestUtils.writeTextFile(fs, myFile, "myFileContent", false);
+
+    // Verify that expunge without Trash directory will not throw Exception
+    trash.expunge();
+
+    // Verify that we succeed in removing the file we created
+    // This should go into Trash.
+    {
+      assertTrue(trash.moveToTrash(myFile));
+      Path currenTrashDir = trash.getCurrentTrashDir(myFile);
+      Path expectedCurrentTrashFile = Path.mergePaths(currenTrashDir, myFile);
+      assertPathExists("File should be moved to trash", 
expectedCurrentTrashFile);
+    }
+
+    // Verify that we can recreate the file
+    ContractTestUtils.writeTextFile(fs, myFile, "myFileContent", false);
+
+    // Verify that we succeed in removing the file we re-created
+    assertTrue(trash.moveToTrash(myFile));

Review Comment:
   how about factoring this out into an assertMovedToTrash(trash,Path) so the 
assertTrue and error message can be used everywhere



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractTrashTest.java:
##########
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.fs.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.Trash;
+import org.apache.hadoop.fs.TrashPolicy;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_CHECKPOINT_INTERVAL_KEY;
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
+
+/**
+ * Test the {@link TrashPolicy} returned by {@link FileSystem#getTrashPolicy}
+ * results in a consistent trash behavior.
+ * <p>
+ *   Consistent trash behavior means that invoking the {@link TrashPolicy} 
methods in the
+ *   following order should not result in unexpected results such as files in 
trash that
+ *   will never be deleted by trash mechanism.
+ *   <ol>
+ *   <li>

Review Comment:
   move these into the filesystem.md specification; the contract becomes the 
verification of this, rather than where they are written for the first time



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractTrashTest.java:
##########
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.fs.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.Trash;
+import org.apache.hadoop.fs.TrashPolicy;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_CHECKPOINT_INTERVAL_KEY;
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
+
+/**
+ * Test the {@link TrashPolicy} returned by {@link FileSystem#getTrashPolicy}
+ * results in a consistent trash behavior.
+ * <p>
+ *   Consistent trash behavior means that invoking the {@link TrashPolicy} 
methods in the
+ *   following order should not result in unexpected results such as files in 
trash that
+ *   will never be deleted by trash mechanism.
+ *   <ol>
+ *   <li>
+ *     {@link TrashPolicy#getDeletionInterval()} should return 0 before
+ *     {@link TrashPolicy#initialize(Configuration, FileSystem)} is invoked.
+ *     The deletion interval should not return negative value. Zero value 
implies
+ *     that trash is disabled, which means {@link TrashPolicy#isEnabled()} 
should
+ *     return false.
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#initialize(Configuration, FileSystem)} should be 
implemented
+ *     and ensure that the subsequent {@link TrashPolicy} operations should 
work properly
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#isEnabled()} should return true after
+ *     {@link TrashPolicy#initialize(Configuration, FileSystem)} is invoked and
+ *     initialize the deletion interval to positive value. {@link 
TrashPolicy#isEnabled()}
+ *     should remain false if the {@link TrashPolicy#getDeletionInterval()}} 
returns 0 even after
+ *     {@link TrashPolicy#initialize(Configuration, FileSystem)} has been 
invoked.
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#moveToTrash(Path)} should move a file or directory 
to the
+ *     current trash directory defined {@link 
TrashPolicy#getCurrentTrashDir(Path)}
+ *     if it's not already in the trash. This implies that
+ *     the {@link FileSystem#exists(Path)} should return false for the 
original path, but
+ *     should return true for the current trash directory.
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#moveToTrash(Path)} should return false if {@link 
TrashPolicy#isEnabled()} is false or
+ *     the path is already under {@link FileSystem#getTrashRoot(Path)}. There 
should not be any side
+ *     effect when {@link TrashPolicy#moveToTrash(Path)} returns false.
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#createCheckpoint()} should create rename the current 
trash directory to
+ *     another trash directory which is not equal to {@link 
TrashPolicy#getCurrentTrashDir(Path)}.
+ *     {@link TrashPolicy#createCheckpoint()} is a no-op if there is no 
current trash directory.
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#deleteCheckpoint()} should cleanup the all the 
current
+ *     and checkpoint directories under {@link 
FileSystem#getTrashRoots(boolean)} created before
+ *     {@link TrashPolicy#getDeletionInterval()} minutes ago.
+ *     Note that the current trash directory {@link 
TrashPolicy#getCurrentTrashDir()} should not be deleted.
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#deleteCheckpointsImmediately()} should cleanup the 
checkpoint directories under
+ *     {@link FileSystem#getTrashRoots(boolean)} regardless of the checkpoint 
timestamp.
+ *     Note that the current trash directory {@link 
TrashPolicy#getCurrentTrashDir()} should not be deleted.
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#getEmptier()} returns a runnable that will empty the 
trash.
+ *     The effective trash emptier interval should be [0, {@link 
TrashPolicy#getDeletionInterval()}].
+ *     Zero interval means that {@link Runnable#run()} is a no-op and returns 
immediately.
+ *     Non-zero trash emptier interval means that the {@link Runnable#run()} 
keeps running for
+ *     each interval (unless it is interrupted). For each interval, the trash 
emptier carry out the
+ *     following operations:
+ *     <ol>
+ *       It checks all the trash root directories through {@link 
FileSystem#getTrashRoots(boolean)} for all users.
+ *     </ol>
+ *     <ol>
+ *       For each trash root directory, it deletes the trash checkpoint 
directory with checkpoint time older than
+ *       {@link TrashPolicy#getDeletionInterval()}. Afterward, it creates a 
new trash checkpoint through
+ *       {@link TrashPolicy#createCheckpoint()}. Note that existing 
checkpoints which has not expired, will not
+ *       have any change.
+ *     </ol>
+ *   </li>
+ *   </ol>
+ * </p>
+ */
+public abstract class AbstractContractTrashTest extends 
AbstractFSContractTestBase {
+
+  @BeforeEach
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    // Enable trash with 12 seconds deletes and 6 seconds checkpoints
+    conf.set(FS_TRASH_INTERVAL_KEY, "0.2"); // 12 seconds
+    conf.set(FS_TRASH_CHECKPOINT_INTERVAL_KEY, "0.1"); // 6 seconds
+    return conf;
+  }
+
+  @AfterEach
+  @Override
+  public void teardown() throws Exception {
+    final FileSystem fs = getFileSystem();
+    Collection<FileStatus> trashRoots = fs.getTrashRoots(true);
+    for (FileStatus trashRoot : trashRoots) {
+      fs.delete(trashRoot.getPath(), true);
+    }
+    super.teardown();
+  }
+
+  @Test
+  public void testTrashPolicy() throws Throwable {
+    final FileSystem fs = getFileSystem();
+
+    // TrashPolicy needs to be initialized with non-zero deletion interval 
before
+    // TrashPolicy#isEnabled returns true
+    final TrashPolicy trashPolicy = fs.getTrashPolicy(getContract().getConf());
+    assertFalse(trashPolicy.isEnabled());
+    assertEquals(0, trashPolicy.getDeletionInterval());
+    assertFalse(trashPolicy.moveToTrash(new Path("randomFile")));
+    trashPolicy.initialize(getContract().getConf(), fs);
+    assertTrue(trashPolicy.isEnabled());
+    assertTrue(trashPolicy.getDeletionInterval() > 0);
+
+    // Check that the current directory is still empty even if checkpoints 
operation is run
+    assertPathDoesNotExist("trash current directory should not exist before 
moveToTrash",
+        trashPolicy.getCurrentTrashDir());
+    trashPolicy.createCheckpoint();
+    assertPathDoesNotExist("trash current directory should not exist before 
moveToTrash",
+        trashPolicy.getCurrentTrashDir());
+    trashPolicy.deleteCheckpoint();
+    assertPathDoesNotExist("trash current directory should not exist before 
moveToTrash",
+        trashPolicy.getCurrentTrashDir());
+    trashPolicy.deleteCheckpointsImmediately();
+    assertPathDoesNotExist("trash current directory should not exist before 
moveToTrash",
+        trashPolicy.getCurrentTrashDir());
+
+    // TrashPolicy#moveToTrash should move the file to the current trash 
directory
+    Path base = methodPath();
+    mkdirs(base);
+    Path fileToDelete = new Path(base, "testFile");
+    byte[] data = ContractTestUtils.dataset(256, 'a', 'z');
+    ContractTestUtils.writeDataset(fs, fileToDelete, data, data.length, 1024 * 
1024, false);
+
+    assertTrue(trashPolicy.moveToTrash(fileToDelete));
+    assertPathExists("trash current directory should exist after moveToTrash",
+        trashPolicy.getCurrentTrashDir());
+    Path expectedCurrentTrashPath = 
Path.mergePaths(trashPolicy.getCurrentTrashDir(fileToDelete), fileToDelete);;
+    ContractTestUtils.verifyFileContents(fs, expectedCurrentTrashPath, data);
+    // Calling TrashPolicy#moveToTrash on the key in path should return false
+    // and the file remains unchanged
+    assertFalse(trashPolicy.moveToTrash(expectedCurrentTrashPath));
+    ContractTestUtils.verifyFileContents(fs, expectedCurrentTrashPath, data);
+
+    // Calling TrashPolicy#deleteCheckpoint or 
TrashPolicy#deleteCheckpointImmediately has no effect on the
+    // current trash directory.
+    trashPolicy.deleteCheckpoint();
+    trashPolicy.deleteCheckpointsImmediately();
+    ContractTestUtils.verifyFileContents(fs, expectedCurrentTrashPath, data);
+
+    // TrashPolicy#createCheckpoint rename the current trash directory to a 
new directory
+    trashPolicy.createCheckpoint();
+    assertPathDoesNotExist("trash current directory should not exist after 
checkpoint",
+        trashPolicy.getCurrentTrashDir(fileToDelete));
+    assertPathDoesNotExist("the path under current trash directory should not 
exist after checkpoint",
+        expectedCurrentTrashPath);
+    FileStatus[] trashRootChildren = ContractTestUtils.listChildren(fs, 
fs.getTrashRoot(fileToDelete));
+    assertEquals(1, trashRootChildren.length);
+    FileStatus trashCheckpointDir = trashRootChildren[0];
+    Path expectedCheckpointTrashPath = 
Path.mergePaths(trashCheckpointDir.getPath(), fileToDelete);
+    ContractTestUtils.verifyFileContents(fs, expectedCheckpointTrashPath, 
data);
+
+    // TrashPolicy#deleteCheckpoint
+    Thread.sleep(12000); // This should be the time set as deletion interval
+    trashPolicy.deleteCheckpoint();
+    assertPathDoesNotExist("the path under checkpoint directory should be 
deleted",
+        expectedCheckpointTrashPath);
+    trashRootChildren = ContractTestUtils.listChildren(fs, 
fs.getTrashRoot(fileToDelete));
+    assertEquals(0, trashRootChildren.length);
+  }
+
+  @Test
+  public void testEmptier() throws Throwable {
+    // Adapted from TestTrash#testTrashEmptier.
+    final FileSystem fs = getFileSystem();
+
+    // Start Emptier in background
+    final TrashPolicy trashPolicy = fs.getTrashPolicy(getContract().getConf());
+    trashPolicy.initialize(getContract().getConf(), fs);
+
+    Runnable emptier = trashPolicy.getEmptier();
+    Thread emptierThread = new Thread(emptier);
+    emptierThread.start();
+
+    // First create a new directory with mkdirs
+    Path base = methodPath();
+    mkdirs(base);
+    int fileIndex = 0;
+    Set<String> checkpoints = new HashSet<>();
+    while (true) {
+      // Create a file with a new name
+      Path myFile = new Path(base, "myFile" + fileIndex);
+      ContractTestUtils.writeTextFile(fs, myFile, "file" + fileIndex, false);
+      fileIndex++;
+
+      // Move the files to trash
+      assertTrue(trashPolicy.moveToTrash(myFile));

Review Comment:
   add an error message for the assert failure, include filename



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java:
##########
@@ -3429,6 +3429,29 @@ public Collection<? extends BlockStoragePolicySpi> 
getAllStoragePolicies()
         + " doesn't support getAllStoragePolicies");
   }
 
+  /**
+   * Get the trash policy implementation used by this FileSystem. This trash 
policy
+   * is used by classes of {@link Trash} to implement the trash behavior.
+   * <p>
+   * FileSystem implementation can consider overriding this method to handle
+   * situation where a single FileSystem client shares a configuration, but
+   * each FileSystem scheme requires a distinct TrashPolicy implementation.
+   *
+   * @param conf configuration which can be used to choose the TrashPolicy
+   *             implementation.
+   * @return TrashPolicy implementation by this filesystem.
+   *         The default implementation returns the configured TrashPolicy
+   *         based on the value of the configuration parameter 
fs.trash.classname
+   *         of the passed configuration.
+   */
+  @InterfaceAudience.Public
+  @InterfaceStability.Unstable
+  public TrashPolicy getTrashPolicy(Configuration conf) {

Review Comment:
   have it take a Path. 
   
   this'll allow viewfs to resolve through the mount points to the final value



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/TrashPolicy.java:
##########
@@ -146,18 +156,19 @@ public static TrashPolicy getInstance(Configuration conf, 
FileSystem fs, Path ho
   }
 
   /**
-   * Get an instance of the configured TrashPolicy based on the value
-   * of the configuration parameter fs.trash.classname.
+   * Get an instance of the TrashPolicy associated with the FileSystem 
implementation of
+   * {@link FileSystem#getTrashPolicy(Configuration)}. The configuration 
passed might be used
+   * by the FileSystem implementation to pick the {@link TrashPolicy} 
implementation. The default
+   * {@link FileSystem#getTrashPolicy(Configuration)} checks 
fs.trash.classname to pick the
+   * {@link TrashPolicy} implementation.
    *
    * @param conf the configuration to be used
    * @param fs the file system to be used
    * @return an instance of TrashPolicy
    */
   public static TrashPolicy getInstance(Configuration conf, FileSystem fs) {
-    Class<? extends TrashPolicy> trashClass = conf.getClass(
-        "fs.trash.classname", TrashPolicyDefault.class, TrashPolicy.class);
-    TrashPolicy trash = ReflectionUtils.newInstance(trashClass, conf);
-    trash.initialize(conf, fs); // initialize TrashPolicy
-    return trash;
+    TrashPolicy trashPolicy = fs.getTrashPolicy(conf);
+    trashPolicy.initialize(conf, fs); // initialize TrashPolicy

Review Comment:
   what about requiring the initialize to be done in the fs instance, rather 
than here? That would support directly accessing the policy in newer code



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestTrash.java:
##########
@@ -629,10 +631,23 @@ public void testNonDefaultFS() throws IOException {
   public void testPluggableTrash() throws IOException {
     Configuration conf = new Configuration();
 
-    // Test plugged TrashPolicy
-    conf.setClass("fs.trash.classname", TestTrashPolicy.class, 
TrashPolicy.class);
-    Trash trash = new Trash(conf);
-    
assertTrue(trash.getTrashPolicy().getClass().equals(TestTrashPolicy.class));
+    {
+      // Test plugged TrashPolicy
+      conf.setClass("fs.trash.classname", TestTrashPolicy.class, 
TrashPolicy.class);
+      Trash trash = new Trash(conf);
+      assertInstanceOf(TestTrashPolicy.class, trash.getTrashPolicy());

Review Comment:
   can you use assertJ asserts , as these we can backport to older branches 
without problems
   ```
   
Assertions.assertThat(trash.getTrashPolicy()).isInstanceOf(TestTrashPolicy.class)
   ```
   



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractTrashTest.java:
##########
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.fs.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.Trash;
+import org.apache.hadoop.fs.TrashPolicy;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_CHECKPOINT_INTERVAL_KEY;
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
+
+/**
+ * Test the {@link TrashPolicy} returned by {@link FileSystem#getTrashPolicy}
+ * results in a consistent trash behavior.
+ * <p>
+ *   Consistent trash behavior means that invoking the {@link TrashPolicy} 
methods in the
+ *   following order should not result in unexpected results such as files in 
trash that
+ *   will never be deleted by trash mechanism.
+ *   <ol>
+ *   <li>
+ *     {@link TrashPolicy#getDeletionInterval()} should return 0 before
+ *     {@link TrashPolicy#initialize(Configuration, FileSystem)} is invoked.
+ *     The deletion interval should not return negative value. Zero value 
implies
+ *     that trash is disabled, which means {@link TrashPolicy#isEnabled()} 
should
+ *     return false.
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#initialize(Configuration, FileSystem)} should be 
implemented
+ *     and ensure that the subsequent {@link TrashPolicy} operations should 
work properly
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#isEnabled()} should return true after
+ *     {@link TrashPolicy#initialize(Configuration, FileSystem)} is invoked and
+ *     initialize the deletion interval to positive value. {@link 
TrashPolicy#isEnabled()}
+ *     should remain false if the {@link TrashPolicy#getDeletionInterval()}} 
returns 0 even after
+ *     {@link TrashPolicy#initialize(Configuration, FileSystem)} has been 
invoked.
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#moveToTrash(Path)} should move a file or directory 
to the
+ *     current trash directory defined {@link 
TrashPolicy#getCurrentTrashDir(Path)}
+ *     if it's not already in the trash. This implies that
+ *     the {@link FileSystem#exists(Path)} should return false for the 
original path, but
+ *     should return true for the current trash directory.
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#moveToTrash(Path)} should return false if {@link 
TrashPolicy#isEnabled()} is false or
+ *     the path is already under {@link FileSystem#getTrashRoot(Path)}. There 
should not be any side
+ *     effect when {@link TrashPolicy#moveToTrash(Path)} returns false.
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#createCheckpoint()} should create rename the current 
trash directory to
+ *     another trash directory which is not equal to {@link 
TrashPolicy#getCurrentTrashDir(Path)}.
+ *     {@link TrashPolicy#createCheckpoint()} is a no-op if there is no 
current trash directory.
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#deleteCheckpoint()} should cleanup the all the 
current
+ *     and checkpoint directories under {@link 
FileSystem#getTrashRoots(boolean)} created before
+ *     {@link TrashPolicy#getDeletionInterval()} minutes ago.
+ *     Note that the current trash directory {@link 
TrashPolicy#getCurrentTrashDir()} should not be deleted.
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#deleteCheckpointsImmediately()} should cleanup the 
checkpoint directories under
+ *     {@link FileSystem#getTrashRoots(boolean)} regardless of the checkpoint 
timestamp.
+ *     Note that the current trash directory {@link 
TrashPolicy#getCurrentTrashDir()} should not be deleted.
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#getEmptier()} returns a runnable that will empty the 
trash.
+ *     The effective trash emptier interval should be [0, {@link 
TrashPolicy#getDeletionInterval()}].
+ *     Zero interval means that {@link Runnable#run()} is a no-op and returns 
immediately.
+ *     Non-zero trash emptier interval means that the {@link Runnable#run()} 
keeps running for
+ *     each interval (unless it is interrupted). For each interval, the trash 
emptier carry out the
+ *     following operations:
+ *     <ol>
+ *       It checks all the trash root directories through {@link 
FileSystem#getTrashRoots(boolean)} for all users.
+ *     </ol>
+ *     <ol>
+ *       For each trash root directory, it deletes the trash checkpoint 
directory with checkpoint time older than
+ *       {@link TrashPolicy#getDeletionInterval()}. Afterward, it creates a 
new trash checkpoint through
+ *       {@link TrashPolicy#createCheckpoint()}. Note that existing 
checkpoints which has not expired, will not
+ *       have any change.
+ *     </ol>
+ *   </li>
+ *   </ol>
+ * </p>
+ */
+public abstract class AbstractContractTrashTest extends 
AbstractFSContractTestBase {

Review Comment:
   there's a risk here that recycled filesystems have the older trash policy. 
Have setup invoke a `FileSystem.closeAll()`



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractTrashTest.java:
##########
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.fs.contract;
+
+import org.apache.hadoop.conf.Configuration;

Review Comment:
   
   nit, import ordering. 
   
   java.* at top,
   
   ---
   
   non apache 
   
   --
   
   org.apache
   
   --
   
   static



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractTrashTest.java:
##########
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.fs.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.Trash;
+import org.apache.hadoop.fs.TrashPolicy;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_CHECKPOINT_INTERVAL_KEY;
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
+
+/**
+ * Test the {@link TrashPolicy} returned by {@link FileSystem#getTrashPolicy}
+ * results in a consistent trash behavior.
+ * <p>
+ *   Consistent trash behavior means that invoking the {@link TrashPolicy} 
methods in the
+ *   following order should not result in unexpected results such as files in 
trash that
+ *   will never be deleted by trash mechanism.
+ *   <ol>
+ *   <li>
+ *     {@link TrashPolicy#getDeletionInterval()} should return 0 before
+ *     {@link TrashPolicy#initialize(Configuration, FileSystem)} is invoked.
+ *     The deletion interval should not return negative value. Zero value 
implies
+ *     that trash is disabled, which means {@link TrashPolicy#isEnabled()} 
should
+ *     return false.
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#initialize(Configuration, FileSystem)} should be 
implemented
+ *     and ensure that the subsequent {@link TrashPolicy} operations should 
work properly
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#isEnabled()} should return true after
+ *     {@link TrashPolicy#initialize(Configuration, FileSystem)} is invoked and
+ *     initialize the deletion interval to positive value. {@link 
TrashPolicy#isEnabled()}
+ *     should remain false if the {@link TrashPolicy#getDeletionInterval()}} 
returns 0 even after
+ *     {@link TrashPolicy#initialize(Configuration, FileSystem)} has been 
invoked.
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#moveToTrash(Path)} should move a file or directory 
to the
+ *     current trash directory defined {@link 
TrashPolicy#getCurrentTrashDir(Path)}
+ *     if it's not already in the trash. This implies that
+ *     the {@link FileSystem#exists(Path)} should return false for the 
original path, but
+ *     should return true for the current trash directory.
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#moveToTrash(Path)} should return false if {@link 
TrashPolicy#isEnabled()} is false or
+ *     the path is already under {@link FileSystem#getTrashRoot(Path)}. There 
should not be any side
+ *     effect when {@link TrashPolicy#moveToTrash(Path)} returns false.
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#createCheckpoint()} should create rename the current 
trash directory to
+ *     another trash directory which is not equal to {@link 
TrashPolicy#getCurrentTrashDir(Path)}.
+ *     {@link TrashPolicy#createCheckpoint()} is a no-op if there is no 
current trash directory.
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#deleteCheckpoint()} should cleanup the all the 
current
+ *     and checkpoint directories under {@link 
FileSystem#getTrashRoots(boolean)} created before
+ *     {@link TrashPolicy#getDeletionInterval()} minutes ago.
+ *     Note that the current trash directory {@link 
TrashPolicy#getCurrentTrashDir()} should not be deleted.
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#deleteCheckpointsImmediately()} should cleanup the 
checkpoint directories under
+ *     {@link FileSystem#getTrashRoots(boolean)} regardless of the checkpoint 
timestamp.
+ *     Note that the current trash directory {@link 
TrashPolicy#getCurrentTrashDir()} should not be deleted.
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#getEmptier()} returns a runnable that will empty the 
trash.
+ *     The effective trash emptier interval should be [0, {@link 
TrashPolicy#getDeletionInterval()}].
+ *     Zero interval means that {@link Runnable#run()} is a no-op and returns 
immediately.
+ *     Non-zero trash emptier interval means that the {@link Runnable#run()} 
keeps running for
+ *     each interval (unless it is interrupted). For each interval, the trash 
emptier carry out the
+ *     following operations:
+ *     <ol>
+ *       It checks all the trash root directories through {@link 
FileSystem#getTrashRoots(boolean)} for all users.
+ *     </ol>
+ *     <ol>
+ *       For each trash root directory, it deletes the trash checkpoint 
directory with checkpoint time older than
+ *       {@link TrashPolicy#getDeletionInterval()}. Afterward, it creates a 
new trash checkpoint through
+ *       {@link TrashPolicy#createCheckpoint()}. Note that existing 
checkpoints which has not expired, will not
+ *       have any change.
+ *     </ol>
+ *   </li>
+ *   </ol>
+ * </p>
+ */
+public abstract class AbstractContractTrashTest extends 
AbstractFSContractTestBase {
+
+  @BeforeEach
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    // Enable trash with 12 seconds deletes and 6 seconds checkpoints
+    conf.set(FS_TRASH_INTERVAL_KEY, "0.2"); // 12 seconds
+    conf.set(FS_TRASH_CHECKPOINT_INTERVAL_KEY, "0.1"); // 6 seconds
+    return conf;
+  }
+
+  @AfterEach
+  @Override
+  public void teardown() throws Exception {
+    final FileSystem fs = getFileSystem();
+    Collection<FileStatus> trashRoots = fs.getTrashRoots(true);
+    for (FileStatus trashRoot : trashRoots) {
+      fs.delete(trashRoot.getPath(), true);
+    }
+    super.teardown();
+  }
+
+  @Test
+  public void testTrashPolicy() throws Throwable {
+    final FileSystem fs = getFileSystem();
+
+    // TrashPolicy needs to be initialized with non-zero deletion interval 
before
+    // TrashPolicy#isEnabled returns true
+    final TrashPolicy trashPolicy = fs.getTrashPolicy(getContract().getConf());
+    assertFalse(trashPolicy.isEnabled());
+    assertEquals(0, trashPolicy.getDeletionInterval());
+    assertFalse(trashPolicy.moveToTrash(new Path("randomFile")));
+    trashPolicy.initialize(getContract().getConf(), fs);
+    assertTrue(trashPolicy.isEnabled());
+    assertTrue(trashPolicy.getDeletionInterval() > 0);
+
+    // Check that the current directory is still empty even if checkpoints 
operation is run
+    assertPathDoesNotExist("trash current directory should not exist before 
moveToTrash",
+        trashPolicy.getCurrentTrashDir());
+    trashPolicy.createCheckpoint();
+    assertPathDoesNotExist("trash current directory should not exist before 
moveToTrash",
+        trashPolicy.getCurrentTrashDir());
+    trashPolicy.deleteCheckpoint();
+    assertPathDoesNotExist("trash current directory should not exist before 
moveToTrash",
+        trashPolicy.getCurrentTrashDir());
+    trashPolicy.deleteCheckpointsImmediately();
+    assertPathDoesNotExist("trash current directory should not exist before 
moveToTrash",
+        trashPolicy.getCurrentTrashDir());
+
+    // TrashPolicy#moveToTrash should move the file to the current trash 
directory
+    Path base = methodPath();
+    mkdirs(base);
+    Path fileToDelete = new Path(base, "testFile");
+    byte[] data = ContractTestUtils.dataset(256, 'a', 'z');
+    ContractTestUtils.writeDataset(fs, fileToDelete, data, data.length, 1024 * 
1024, false);
+
+    assertTrue(trashPolicy.moveToTrash(fileToDelete));
+    assertPathExists("trash current directory should exist after moveToTrash",
+        trashPolicy.getCurrentTrashDir());
+    Path expectedCurrentTrashPath = 
Path.mergePaths(trashPolicy.getCurrentTrashDir(fileToDelete), fileToDelete);;
+    ContractTestUtils.verifyFileContents(fs, expectedCurrentTrashPath, data);
+    // Calling TrashPolicy#moveToTrash on the key in path should return false
+    // and the file remains unchanged
+    assertFalse(trashPolicy.moveToTrash(expectedCurrentTrashPath));
+    ContractTestUtils.verifyFileContents(fs, expectedCurrentTrashPath, data);
+
+    // Calling TrashPolicy#deleteCheckpoint or 
TrashPolicy#deleteCheckpointImmediately has no effect on the
+    // current trash directory.
+    trashPolicy.deleteCheckpoint();
+    trashPolicy.deleteCheckpointsImmediately();
+    ContractTestUtils.verifyFileContents(fs, expectedCurrentTrashPath, data);
+
+    // TrashPolicy#createCheckpoint rename the current trash directory to a 
new directory
+    trashPolicy.createCheckpoint();
+    assertPathDoesNotExist("trash current directory should not exist after 
checkpoint",
+        trashPolicy.getCurrentTrashDir(fileToDelete));
+    assertPathDoesNotExist("the path under current trash directory should not 
exist after checkpoint",
+        expectedCurrentTrashPath);
+    FileStatus[] trashRootChildren = ContractTestUtils.listChildren(fs, 
fs.getTrashRoot(fileToDelete));
+    assertEquals(1, trashRootChildren.length);
+    FileStatus trashCheckpointDir = trashRootChildren[0];
+    Path expectedCheckpointTrashPath = 
Path.mergePaths(trashCheckpointDir.getPath(), fileToDelete);
+    ContractTestUtils.verifyFileContents(fs, expectedCheckpointTrashPath, 
data);
+
+    // TrashPolicy#deleteCheckpoint
+    Thread.sleep(12000); // This should be the time set as deletion interval
+    trashPolicy.deleteCheckpoint();
+    assertPathDoesNotExist("the path under checkpoint directory should be 
deleted",
+        expectedCheckpointTrashPath);
+    trashRootChildren = ContractTestUtils.listChildren(fs, 
fs.getTrashRoot(fileToDelete));
+    assertEquals(0, trashRootChildren.length);

Review Comment:
   AssertJ assertion on an array size is better as it'll list the contents of 
the array



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractTrashTest.java:
##########
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.fs.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.Trash;
+import org.apache.hadoop.fs.TrashPolicy;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_CHECKPOINT_INTERVAL_KEY;
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
+
+/**
+ * Test the {@link TrashPolicy} returned by {@link FileSystem#getTrashPolicy}
+ * results in a consistent trash behavior.
+ * <p>
+ *   Consistent trash behavior means that invoking the {@link TrashPolicy} 
methods in the
+ *   following order should not result in unexpected results such as files in 
trash that
+ *   will never be deleted by trash mechanism.
+ *   <ol>
+ *   <li>
+ *     {@link TrashPolicy#getDeletionInterval()} should return 0 before
+ *     {@link TrashPolicy#initialize(Configuration, FileSystem)} is invoked.
+ *     The deletion interval should not return negative value. Zero value 
implies
+ *     that trash is disabled, which means {@link TrashPolicy#isEnabled()} 
should
+ *     return false.
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#initialize(Configuration, FileSystem)} should be 
implemented
+ *     and ensure that the subsequent {@link TrashPolicy} operations should 
work properly
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#isEnabled()} should return true after
+ *     {@link TrashPolicy#initialize(Configuration, FileSystem)} is invoked and
+ *     initialize the deletion interval to positive value. {@link 
TrashPolicy#isEnabled()}
+ *     should remain false if the {@link TrashPolicy#getDeletionInterval()}} 
returns 0 even after
+ *     {@link TrashPolicy#initialize(Configuration, FileSystem)} has been 
invoked.
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#moveToTrash(Path)} should move a file or directory 
to the
+ *     current trash directory defined {@link 
TrashPolicy#getCurrentTrashDir(Path)}
+ *     if it's not already in the trash. This implies that
+ *     the {@link FileSystem#exists(Path)} should return false for the 
original path, but
+ *     should return true for the current trash directory.
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#moveToTrash(Path)} should return false if {@link 
TrashPolicy#isEnabled()} is false or
+ *     the path is already under {@link FileSystem#getTrashRoot(Path)}. There 
should not be any side
+ *     effect when {@link TrashPolicy#moveToTrash(Path)} returns false.
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#createCheckpoint()} should create rename the current 
trash directory to
+ *     another trash directory which is not equal to {@link 
TrashPolicy#getCurrentTrashDir(Path)}.
+ *     {@link TrashPolicy#createCheckpoint()} is a no-op if there is no 
current trash directory.
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#deleteCheckpoint()} should cleanup the all the 
current
+ *     and checkpoint directories under {@link 
FileSystem#getTrashRoots(boolean)} created before
+ *     {@link TrashPolicy#getDeletionInterval()} minutes ago.
+ *     Note that the current trash directory {@link 
TrashPolicy#getCurrentTrashDir()} should not be deleted.
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#deleteCheckpointsImmediately()} should cleanup the 
checkpoint directories under
+ *     {@link FileSystem#getTrashRoots(boolean)} regardless of the checkpoint 
timestamp.
+ *     Note that the current trash directory {@link 
TrashPolicy#getCurrentTrashDir()} should not be deleted.
+ *   </li>
+ *   <li>
+ *     {@link TrashPolicy#getEmptier()} returns a runnable that will empty the 
trash.
+ *     The effective trash emptier interval should be [0, {@link 
TrashPolicy#getDeletionInterval()}].
+ *     Zero interval means that {@link Runnable#run()} is a no-op and returns 
immediately.
+ *     Non-zero trash emptier interval means that the {@link Runnable#run()} 
keeps running for
+ *     each interval (unless it is interrupted). For each interval, the trash 
emptier carry out the
+ *     following operations:
+ *     <ol>
+ *       It checks all the trash root directories through {@link 
FileSystem#getTrashRoots(boolean)} for all users.
+ *     </ol>
+ *     <ol>
+ *       For each trash root directory, it deletes the trash checkpoint 
directory with checkpoint time older than
+ *       {@link TrashPolicy#getDeletionInterval()}. Afterward, it creates a 
new trash checkpoint through
+ *       {@link TrashPolicy#createCheckpoint()}. Note that existing 
checkpoints which has not expired, will not
+ *       have any change.
+ *     </ol>
+ *   </li>
+ *   </ol>
+ * </p>
+ */
+public abstract class AbstractContractTrashTest extends 
AbstractFSContractTestBase {
+
+  @BeforeEach
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    // Enable trash with 12 seconds deletes and 6 seconds checkpoints
+    conf.set(FS_TRASH_INTERVAL_KEY, "0.2"); // 12 seconds
+    conf.set(FS_TRASH_CHECKPOINT_INTERVAL_KEY, "0.1"); // 6 seconds
+    return conf;
+  }
+
+  @AfterEach
+  @Override
+  public void teardown() throws Exception {
+    final FileSystem fs = getFileSystem();
+    Collection<FileStatus> trashRoots = fs.getTrashRoots(true);
+    for (FileStatus trashRoot : trashRoots) {
+      fs.delete(trashRoot.getPath(), true);
+    }
+    super.teardown();
+  }
+
+  @Test
+  public void testTrashPolicy() throws Throwable {
+    final FileSystem fs = getFileSystem();
+
+    // TrashPolicy needs to be initialized with non-zero deletion interval 
before
+    // TrashPolicy#isEnabled returns true
+    final TrashPolicy trashPolicy = fs.getTrashPolicy(getContract().getConf());
+    assertFalse(trashPolicy.isEnabled());
+    assertEquals(0, trashPolicy.getDeletionInterval());
+    assertFalse(trashPolicy.moveToTrash(new Path("randomFile")));
+    trashPolicy.initialize(getContract().getConf(), fs);
+    assertTrue(trashPolicy.isEnabled());
+    assertTrue(trashPolicy.getDeletionInterval() > 0);
+
+    // Check that the current directory is still empty even if checkpoints 
operation is run
+    assertPathDoesNotExist("trash current directory should not exist before 
moveToTrash",
+        trashPolicy.getCurrentTrashDir());
+    trashPolicy.createCheckpoint();
+    assertPathDoesNotExist("trash current directory should not exist before 
moveToTrash",
+        trashPolicy.getCurrentTrashDir());
+    trashPolicy.deleteCheckpoint();
+    assertPathDoesNotExist("trash current directory should not exist before 
moveToTrash",
+        trashPolicy.getCurrentTrashDir());
+    trashPolicy.deleteCheckpointsImmediately();
+    assertPathDoesNotExist("trash current directory should not exist before 
moveToTrash",
+        trashPolicy.getCurrentTrashDir());
+
+    // TrashPolicy#moveToTrash should move the file to the current trash 
directory
+    Path base = methodPath();
+    mkdirs(base);
+    Path fileToDelete = new Path(base, "testFile");
+    byte[] data = ContractTestUtils.dataset(256, 'a', 'z');
+    ContractTestUtils.writeDataset(fs, fileToDelete, data, data.length, 1024 * 
1024, false);
+
+    assertTrue(trashPolicy.moveToTrash(fileToDelete));
+    assertPathExists("trash current directory should exist after moveToTrash",
+        trashPolicy.getCurrentTrashDir());
+    Path expectedCurrentTrashPath = 
Path.mergePaths(trashPolicy.getCurrentTrashDir(fileToDelete), fileToDelete);;
+    ContractTestUtils.verifyFileContents(fs, expectedCurrentTrashPath, data);
+    // Calling TrashPolicy#moveToTrash on the key in path should return false
+    // and the file remains unchanged
+    assertFalse(trashPolicy.moveToTrash(expectedCurrentTrashPath));
+    ContractTestUtils.verifyFileContents(fs, expectedCurrentTrashPath, data);
+
+    // Calling TrashPolicy#deleteCheckpoint or 
TrashPolicy#deleteCheckpointImmediately has no effect on the
+    // current trash directory.
+    trashPolicy.deleteCheckpoint();
+    trashPolicy.deleteCheckpointsImmediately();
+    ContractTestUtils.verifyFileContents(fs, expectedCurrentTrashPath, data);
+
+    // TrashPolicy#createCheckpoint rename the current trash directory to a 
new directory
+    trashPolicy.createCheckpoint();
+    assertPathDoesNotExist("trash current directory should not exist after 
checkpoint",
+        trashPolicy.getCurrentTrashDir(fileToDelete));
+    assertPathDoesNotExist("the path under current trash directory should not 
exist after checkpoint",
+        expectedCurrentTrashPath);
+    FileStatus[] trashRootChildren = ContractTestUtils.listChildren(fs, 
fs.getTrashRoot(fileToDelete));
+    assertEquals(1, trashRootChildren.length);
+    FileStatus trashCheckpointDir = trashRootChildren[0];
+    Path expectedCheckpointTrashPath = 
Path.mergePaths(trashCheckpointDir.getPath(), fileToDelete);
+    ContractTestUtils.verifyFileContents(fs, expectedCheckpointTrashPath, 
data);
+
+    // TrashPolicy#deleteCheckpoint
+    Thread.sleep(12000); // This should be the time set as deletion interval
+    trashPolicy.deleteCheckpoint();
+    assertPathDoesNotExist("the path under checkpoint directory should be 
deleted",
+        expectedCheckpointTrashPath);
+    trashRootChildren = ContractTestUtils.listChildren(fs, 
fs.getTrashRoot(fileToDelete));
+    assertEquals(0, trashRootChildren.length);
+  }
+
+  @Test
+  public void testEmptier() throws Throwable {
+    // Adapted from TestTrash#testTrashEmptier.
+    final FileSystem fs = getFileSystem();
+
+    // Start Emptier in background
+    final TrashPolicy trashPolicy = fs.getTrashPolicy(getContract().getConf());
+    trashPolicy.initialize(getContract().getConf(), fs);
+
+    Runnable emptier = trashPolicy.getEmptier();
+    Thread emptierThread = new Thread(emptier);
+    emptierThread.start();
+
+    // First create a new directory with mkdirs
+    Path base = methodPath();
+    mkdirs(base);
+    int fileIndex = 0;
+    Set<String> checkpoints = new HashSet<>();
+    while (true) {
+      // Create a file with a new name
+      Path myFile = new Path(base, "myFile" + fileIndex);
+      ContractTestUtils.writeTextFile(fs, myFile, "file" + fileIndex, false);
+      fileIndex++;
+
+      // Move the files to trash
+      assertTrue(trashPolicy.moveToTrash(myFile));
+
+      Path trashDir = trashPolicy.getCurrentTrashDir(myFile);
+      FileStatus files[] = fs.listStatus(trashDir.getParent());
+      // Scan files in .Trash and add them to set of checkpoints
+      for (FileStatus file : files) {
+        String fileName = file.getPath().getName();
+        checkpoints.add(fileName);
+      }
+      // If checkpoints has 4 objects it is Current + 3 checkpoint directories
+      if (checkpoints.size() == 4) {
+        // The actual contents should be smaller since the last checkpoint
+        // should've been deleted and Current might not have been recreated yet
+        assertTrue(checkpoints.size() > files.length);

Review Comment:
   use assertj assert on size



##########
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md:
##########
@@ -636,6 +636,36 @@ The path does not have to exist, but the path does need to 
be valid and reconcil
 * The path returned is a directory
 
 
+###  `TrashPolicy getTrashPolicy(Configuration conf)`

Review Comment:
   you need to define what a trash policy is/does. Maybe add a new file 
trashpolicy.md
   
   trash policies are part of the public API (Hive and iceberg use it)





> Introduce getTrashPolicy to FileSystem API
> ------------------------------------------
>
>                 Key: HADOOP-19217
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19217
>             Project: Hadoop Common
>          Issue Type: Improvement
>          Components: fs
>            Reporter: Ivan Andika
>            Priority: Major
>              Labels: pull-request-available
>
> Hadoop FileSystem supports multiple FileSystem implementations awareness 
> (e.g. client is aware of both hdfs:// and ofs:// protocols).
> However, it seems that currently Hadoop TrashPolicy remains the same 
> regardless of the URI scheme. The TrashPolicy is governed by 
> "fs.trash.classname" configuration and stays the same regardless of the 
> FileSystem implementation. For example, HDFS defaults to TrashPolicyDefault 
> and Ozone defaults to TrashPolicyOzone, but only one will be picked since the 
> the configuration will be overwritten by the other.
> Therefore, I propose to tie the TrashPolicy implementation to each FileSystem 
> implementation by introducing a new FileSystem#getTrashPolicy interface. 
> TrashPolicy#getInstance can call FileSystem#getTrashPolicy to get the 
> appropriate TrashPolicy.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to