This is an automated email from the ASF dual-hosted git repository.

yqlin pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 15a5f53  HDFS-15640. Add diff threshold to FedBalance. Contributed by 
Jinglun.
15a5f53 is described below

commit 15a5f5367366fdd76933d0ff6499363fcbc8873e
Author: Yiqun Lin <yq...@apache.org>
AuthorDate: Tue Oct 27 10:41:10 2020 +0800

    HDFS-15640. Add diff threshold to FedBalance. Contributed by Jinglun.
---
 .../hadoop/tools/fedbalance/DistCpProcedure.java   | 43 +++++++++++++++++-----
 .../apache/hadoop/tools/fedbalance/FedBalance.java | 22 ++++++++++-
 .../hadoop/tools/fedbalance/FedBalanceContext.java | 21 +++++++++++
 .../hadoop/tools/fedbalance/FedBalanceOptions.java | 11 ++++++
 .../src/site/markdown/HDFSFederationBalance.md     |  1 +
 .../tools/fedbalance/TestDistCpProcedure.java      | 34 ++++++++++++++++-
 6 files changed, 119 insertions(+), 13 deletions(-)

diff --git 
a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/DistCpProcedure.java
 
b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/DistCpProcedure.java
index 223b777..33d37be 100644
--- 
a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/DistCpProcedure.java
+++ 
b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/DistCpProcedure.java
@@ -89,6 +89,8 @@ public class DistCpProcedure extends BalanceProcedure {
   private boolean forceCloseOpenFiles;
   /* Disable write by setting the mount point readonly. */
   private boolean useMountReadOnly;
+  /* The threshold of diff entries. */
+  private int diffThreshold;
 
   private FsPermission fPerm; // the permission of the src.
   private AclStatus acl; // the acl of the src.
@@ -134,6 +136,7 @@ public class DistCpProcedure extends BalanceProcedure {
     this.bandWidth = context.getBandwidthLimit();
     this.forceCloseOpenFiles = context.getForceCloseOpenFiles();
     this.useMountReadOnly = context.getUseMountReadOnly();
+    this.diffThreshold = context.getDiffThreshold();
     srcFs = (DistributedFileSystem) context.getSrc().getFileSystem(conf);
     dstFs = (DistributedFileSystem) context.getDst().getFileSystem(conf);
   }
@@ -227,12 +230,8 @@ public class DistCpProcedure extends BalanceProcedure {
       } else {
         throw new RetryException(); // wait job complete.
       }
-    } else if (!verifyDiff()) {
-      if (!verifyOpenFiles() || forceCloseOpenFiles) {
-        updateStage(Stage.DISABLE_WRITE);
-      } else {
-        throw new RetryException();
-      }
+    } else if (diffDistCpStageDone()) {
+      updateStage(Stage.DISABLE_WRITE);
     } else {
       submitDiffDistCp();
     }
@@ -372,14 +371,38 @@ public class DistCpProcedure extends BalanceProcedure {
   }
 
   /**
-   * Verify whether the src has changed since CURRENT_SNAPSHOT_NAME snapshot.
+   * Check whether the conditions are satisfied for moving to the next stage.
+   * If the diff entries size is no greater than the threshold and the open
+   * files could be force closed or there is no open file, then moving to the
+   * next stage.
+   *
+   * @return true if moving to the next stage. false if the conditions are not
+   * satisfied.
+   * @throws RetryException if the conditions are not satisfied and the diff
+   * size is under the given threshold scope.
+   */
+  @VisibleForTesting
+  boolean diffDistCpStageDone() throws IOException, RetryException {
+    int diffSize = getDiffSize();
+    if (diffSize <= diffThreshold) {
+      if (forceCloseOpenFiles || !verifyOpenFiles()) {
+        return true;
+      } else {
+        throw new RetryException();
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Get number of the diff entries.
    *
-   * @return true if the src has changed.
+   * @return number of the diff entries.
    */
-  private boolean verifyDiff() throws IOException {
+  private int getDiffSize() throws IOException {
     SnapshotDiffReport diffReport =
         srcFs.getSnapshotDiffReport(src, CURRENT_SNAPSHOT_NAME, "");
-    return diffReport.getDiffList().size() > 0;
+    return diffReport.getDiffList().size();
   }
 
   /**
diff --git 
a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalance.java
 
b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalance.java
index ca6b0df..c850798 100644
--- 
a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalance.java
+++ 
b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalance.java
@@ -51,6 +51,7 @@ import static 
org.apache.hadoop.tools.fedbalance.FedBalanceOptions.BANDWIDTH;
 import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.TRASH;
 import static 
org.apache.hadoop.tools.fedbalance.FedBalanceOptions.DELAY_DURATION;
 import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.CLI_OPTIONS;
+import static 
org.apache.hadoop.tools.fedbalance.FedBalanceOptions.DIFF_THRESHOLD;
 import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.TrashOption;
 
 /**
@@ -91,6 +92,8 @@ public class FedBalance extends Configured implements Tool {
     private TrashOption trashOpt = TrashOption.TRASH;
     /* Specify the duration(millie seconds) when the procedure needs retry. */
     private long delayDuration = TimeUnit.SECONDS.toMillis(1);
+    /* Specify the threshold of diff entries. */
+    private int diffThreshold = 0;
     /* The source input. This specifies the source path. */
     private final String inputSrc;
     /* The dst input. This specifies the dst path. */
@@ -156,6 +159,15 @@ public class FedBalance extends Configured implements Tool 
{
     }
 
     /**
+     * Specify the threshold of diff entries.
+     * @param value the threshold of a fast distcp.
+     */
+    public Builder setDiffThreshold(int value) {
+      this.diffThreshold = value;
+      return this;
+    }
+
+    /**
      * Build the balance job.
      */
     public BalanceJob build() throws IOException {
@@ -172,7 +184,8 @@ public class FedBalance extends Configured implements Tool {
             .setForceCloseOpenFiles(forceCloseOpen)
             .setUseMountReadOnly(routerCluster).setMapNum(map)
             .setBandwidthLimit(bandwidth).setTrash(trashOpt)
-            .setDelayDuration(delayDuration).build();
+            .setDelayDuration(delayDuration)
+            .setDiffThreshold(diffThreshold).build();
       } else { // normal federation cluster.
         Path src = new Path(inputSrc);
         if (src.toUri().getAuthority() == null) {
@@ -181,7 +194,8 @@ public class FedBalance extends Configured implements Tool {
         context = new FedBalanceContext.Builder(src, dst, NO_MOUNT, getConf())
             .setForceCloseOpenFiles(forceCloseOpen)
             .setUseMountReadOnly(routerCluster).setMapNum(map)
-            .setBandwidthLimit(bandwidth).setTrash(trashOpt).build();
+            .setBandwidthLimit(bandwidth).setTrash(trashOpt)
+            .setDiffThreshold(diffThreshold).build();
       }
 
       LOG.info(context.toString());
@@ -290,6 +304,10 @@ public class FedBalance extends Configured implements Tool 
{
       builder.setDelayDuration(
           Long.parseLong(command.getOptionValue(DELAY_DURATION.getOpt())));
     }
+    if (command.hasOption(DIFF_THRESHOLD.getOpt())) {
+      builder.setDiffThreshold(Integer.parseInt(
+          command.getOptionValue(DIFF_THRESHOLD.getOpt())));
+    }
     if (command.hasOption(TRASH.getOpt())) {
       String val = command.getOptionValue(TRASH.getOpt());
       if (val.equalsIgnoreCase("skip")) {
diff --git 
a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceContext.java
 
b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceContext.java
index 56be7db..f4f5700 100644
--- 
a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceContext.java
+++ 
b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceContext.java
@@ -54,6 +54,8 @@ public class FedBalanceContext implements Writable {
   private TrashOption trashOpt;
   /* How long will the procedures be delayed. */
   private long delayDuration;
+  /* The threshold of diff entries. */
+  private int diffThreshold;
 
   private Configuration conf;
 
@@ -91,6 +93,10 @@ public class FedBalanceContext implements Writable {
     return bandwidthLimit;
   }
 
+  public int getDiffThreshold() {
+    return diffThreshold;
+  }
+
   public TrashOption getTrashOpt() {
     return trashOpt;
   }
@@ -107,6 +113,7 @@ public class FedBalanceContext implements Writable {
     out.writeInt(bandwidthLimit);
     out.writeInt(trashOpt.ordinal());
     out.writeLong(delayDuration);
+    out.writeInt(diffThreshold);
   }
 
   @Override
@@ -122,6 +129,7 @@ public class FedBalanceContext implements Writable {
     bandwidthLimit = in.readInt();
     trashOpt = TrashOption.values()[in.readInt()];
     delayDuration = in.readLong();
+    diffThreshold = in.readInt();
   }
 
   @Override
@@ -146,6 +154,7 @@ public class FedBalanceContext implements Writable {
         .append(bandwidthLimit, bc.bandwidthLimit)
         .append(trashOpt, bc.trashOpt)
         .append(delayDuration, bc.delayDuration)
+        .append(diffThreshold, bc.diffThreshold)
         .isEquals();
   }
 
@@ -161,6 +170,7 @@ public class FedBalanceContext implements Writable {
         .append(bandwidthLimit)
         .append(trashOpt)
         .append(delayDuration)
+        .append(diffThreshold)
         .build();
   }
 
@@ -180,6 +190,7 @@ public class FedBalanceContext implements Writable {
     builder.append(", map=").append(mapNum);
     builder.append(", bandwidth=").append(bandwidthLimit);
     builder.append(", delayDuration=").append(delayDuration);
+    builder.append(", diffThreshold=").append(diffThreshold);
     return builder.toString();
   }
 
@@ -194,6 +205,7 @@ public class FedBalanceContext implements Writable {
     private int bandwidthLimit;
     private TrashOption trashOpt;
     private long delayDuration;
+    private int diffThreshold;
 
     /**
      * This class helps building the FedBalanceContext.
@@ -264,6 +276,14 @@ public class FedBalanceContext implements Writable {
     }
 
     /**
+     * Specify the threshold of diff entries.
+     */
+    public Builder setDiffThreshold(int value) {
+      this.diffThreshold = value;
+      return this;
+    }
+
+    /**
      * Build the FedBalanceContext.
      *
      * @return the FedBalanceContext obj.
@@ -280,6 +300,7 @@ public class FedBalanceContext implements Writable {
       context.bandwidthLimit = this.bandwidthLimit;
       context.trashOpt = this.trashOpt;
       context.delayDuration = this.delayDuration;
+      context.diffThreshold = this.diffThreshold;
       return context;
     }
   }
diff --git 
a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceOptions.java
 
b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceOptions.java
index 71a7d9d..d7be6a8 100644
--- 
a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceOptions.java
+++ 
b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceOptions.java
@@ -72,6 +72,17 @@ public final class FedBalanceOptions {
           + " when it waits for the distcp job to finish.");
 
   /**
+   * Specify the threshold of diff entries.
+   */
+  final static Option DIFF_THRESHOLD = new Option("diffThreshold", true,
+      "This specifies the threshold of the diff entries that used in"
+          + " incremental copy stage. If the diff entries size is no greater"
+          + " than this threshold and the open files check is satisfied"
+          + "(no open files or force close all open files), the fedBalance 
will"
+          + " go to the final round of distcp. Default value is 0, that means"
+          + " waiting until there is no diff.");
+
+  /**
    * Move the source path to trash after all the data are sync to target, or
    * delete the source directly, or skip both trash and deletion.
    */
diff --git 
a/hadoop-tools/hadoop-federation-balance/src/site/markdown/HDFSFederationBalance.md
 
b/hadoop-tools/hadoop-federation-balance/src/site/markdown/HDFSFederationBalance.md
index ff42eaf..03e6e60 100644
--- 
a/hadoop-tools/hadoop-federation-balance/src/site/markdown/HDFSFederationBalance.md
+++ 
b/hadoop-tools/hadoop-federation-balance/src/site/markdown/HDFSFederationBalance.md
@@ -101,6 +101,7 @@ Command `submit` has 5 options:
 | -bandwidth | Specify bandwidth per map in MB. | 10 |
 | -delay | Specify the delayed duration(millie seconds) when the job needs to 
retry. | 1000 |
 | -moveToTrash | This options has 3 values: `trash` (move the source path to 
trash), `delete` (delete the source path directly) and `skip` (skip both trash 
and deletion). By default the server side trash interval is used. If the trash 
is disabled in the server side, the default trash interval 60 minutes is used. 
| trash |
+| -diffThreshold | Specify the threshold of the diff entries that used in 
incremental copy stage. If the diff entries size is no greater than the 
threshold and the open files check is satisfied(no open files or force close 
all open files), the fedBalance will go to the final round of distcp. Setting 
to 0 means waiting until there is no diff.| 0 |
 
 ### Configuration Options
 --------------------
diff --git 
a/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestDistCpProcedure.java
 
b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestDistCpProcedure.java
index ec565c3..ea5a8a0 100644
--- 
a/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestDistCpProcedure.java
+++ 
b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestDistCpProcedure.java
@@ -171,6 +171,33 @@ public class TestDistCpProcedure {
     cleanup(fs, new Path(testRoot));
   }
 
+  @Test
+  public void testDiffThreshold() throws Exception {
+    String testRoot = nnUri + "/user/foo/testdir." + getMethodName();
+    DistributedFileSystem fs =
+        (DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
+    createFiles(fs, testRoot, srcfiles);
+    Path src = new Path(testRoot, SRCDAT);
+    Path dst = new Path(testRoot, DSTDAT);
+
+    FedBalanceContext context = buildContext(src, dst, MOUNT, 10);
+    DistCpProcedure dcProcedure =
+        new DistCpProcedure("distcp-procedure", null, 1000, context);
+    executeProcedure(dcProcedure, Stage.DIFF_DISTCP,
+        () -> dcProcedure.initDistCp());
+    // Test distcp with diff entries number no greater than threshold.
+    Path lastPath = new Path(src, "a");
+    for (int i = 0; i < 5; i++) {
+      Path newPath = new Path(src, "a-" + i);
+      fs.rename(lastPath, newPath);
+      lastPath = newPath;
+      assertTrue(dcProcedure.diffDistCpStageDone());
+      executeProcedure(dcProcedure, Stage.DISABLE_WRITE,
+          () -> dcProcedure.diffDistCp());
+    }
+    cleanup(fs, new Path(testRoot));
+  }
+
   @Test(timeout = 30000)
   public void testDiffDistCp() throws Exception {
     String testRoot = nnUri + "/user/foo/testdir." + getMethodName();
@@ -351,9 +378,14 @@ public class TestDistCpProcedure {
   }
 
   private FedBalanceContext buildContext(Path src, Path dst, String mount) {
+    return buildContext(src, dst, mount, 0);
+  }
+
+  private FedBalanceContext buildContext(Path src, Path dst, String mount,
+      int diffThreshold) {
     return new FedBalanceContext.Builder(src, dst, mount, conf).setMapNum(10)
         
.setBandwidthLimit(1).setTrash(TrashOption.TRASH).setDelayDuration(1000)
-        .build();
+        .setDiffThreshold(diffThreshold).build();
   }
 
   interface Call {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to