This is an automated email from the ASF dual-hosted git repository.

tomscut pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8b564569f18 HDFS-17818. Fix serial fsimage transfer during checkpoint 
with multiple namenodes (#7862)
8b564569f18 is described below

commit 8b564569f18906a11f91ab3f55a467edc36e9adf
Author: caozhiqiang <[email protected]>
AuthorDate: Fri Nov 28 17:48:15 2025 +0800

    HDFS-17818. Fix serial fsimage transfer during checkpoint with multiple 
namenodes (#7862)
    
    Reviewed-by: Tao Li <[email protected]>
    Signed-off-by: Tao Li <[email protected]>
---
 .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java |  3 ++
 .../hdfs/server/namenode/CheckpointConf.java       | 13 +++++
 .../server/namenode/ha/StandbyCheckpointer.java    |  3 +-
 .../src/main/resources/hdfs-default.xml            | 11 ++++
 .../server/namenode/ha/TestStandbyCheckpoints.java | 62 +++++++++++++++++++++-
 5 files changed, 90 insertions(+), 2 deletions(-)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index c72bc50c0fb..df9e3907bda 100755
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -259,6 +259,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final long    DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT = 1000000;
   public static final String  DFS_NAMENODE_CHECKPOINT_MAX_RETRIES_KEY = 
"dfs.namenode.checkpoint.max-retries";
   public static final int     DFS_NAMENODE_CHECKPOINT_MAX_RETRIES_DEFAULT = 3;
+  public static final String  
DFS_NAMENODE_CHECKPOINT_PARALLEL_UPLOAD_ENABLED_KEY =
+      "dfs.namenode.checkpoint.parallel.upload.enabled";
+  public static final boolean 
DFS_NAMENODE_CHECKPOINT_PARALLEL_UPLOAD_ENABLED_DEFAULT = false;
   public static final String  
DFS_NAMENODE_MISSING_CHECKPOINT_PERIODS_BEFORE_SHUTDOWN_KEY = 
"dfs.namenode.missing.checkpoint.periods.before.shutdown";
   public static final int     
DFS_NAMENODE_MISSING_CHECKPOINT_PERIODS_BEFORE_SHUTDOWN_DEFAULT = 3;
   public static final String  DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY =
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointConf.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointConf.java
index 4df170d7716..a5f8049c950 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointConf.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointConf.java
@@ -54,6 +54,12 @@ public class CheckpointConf {
   */
   private double quietMultiplier;
 
+  /**
+   * Whether enable the standby namenode to upload fsiamge to multiple other 
namenodes in
+   * parallel, in the cluster with observer namenodes.
+   */
+  private final boolean parallelUploadEnabled;
+
   public CheckpointConf(Configuration conf) {
     checkpointCheckPeriod = conf.getTimeDuration(
         DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY,
@@ -68,6 +74,9 @@ public CheckpointConf(Configuration conf) {
     legacyOivImageDir = conf.get(DFS_NAMENODE_LEGACY_OIV_IMAGE_DIR_KEY);
     quietMultiplier = 
conf.getDouble(DFS_NAMENODE_CHECKPOINT_QUIET_MULTIPLIER_KEY,
       DFS_NAMENODE_CHECKPOINT_QUIET_MULTIPLIER_DEFAULT);
+    parallelUploadEnabled = conf.getBoolean(
+        DFS_NAMENODE_CHECKPOINT_PARALLEL_UPLOAD_ENABLED_KEY,
+        DFS_NAMENODE_CHECKPOINT_PARALLEL_UPLOAD_ENABLED_DEFAULT);
     warnForDeprecatedConfigs(conf);
   }
   
@@ -106,4 +115,8 @@ public String getLegacyOivImageDir() {
   public double getQuietPeriod() {
     return this.checkpointPeriod * this.quietMultiplier;
   }
+
+  public boolean isParallelUploadEnabled() {
+    return parallelUploadEnabled;
+  }
 }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java
index e93a384c99d..0488a17886c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java
@@ -249,7 +249,8 @@ private void doCheckpoint() throws InterruptedException, 
IOException {
     // Do this in a separate thread to avoid blocking transition to active, 
but don't allow more
     // than the expected number of tasks to run or queue up
     // See HDFS-4816
-    ExecutorService executor = new ThreadPoolExecutor(0, 
activeNNAddresses.size(), 100,
+    int poolSize = checkpointConf.isParallelUploadEnabled() ? 
activeNNAddresses.size() : 0;
+    ExecutorService executor = new ThreadPoolExecutor(poolSize, 
activeNNAddresses.size(), 100,
         TimeUnit.MILLISECONDS, new 
LinkedBlockingQueue<Runnable>(activeNNAddresses.size()),
         uploadThreadFactory);
     // for right now, just match the upload to the nn address by convention. 
There is no need to
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index b9d8b67dc12..2b889dd2adc 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -1397,6 +1397,17 @@
   </description>
 </property>
 
+<property>
+  <name>dfs.namenode.checkpoint.parallel.upload.enabled</name>
+  <value>false</value>
+  <description>
+  If true, the CheckpointNode will upload the checkpoint image to multiple 
other
+  NameNodes in parallel, in the cluster with observer namenodes. You should
+  make sure the network bandwidth is sufficient.
+  If false, the fsimage will be uploaded serially to multiple namenodes.
+  </description>
+</property>
+
 <property>
   <name>dfs.namenode.checkpoint.check.quiet-multiplier</name>
   <value>1.5</value>
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java
index e010193bf3e..25926f695af 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java
@@ -484,7 +484,67 @@ public Boolean get() {
     // Assert that former active did not accept the canceled checkpoint file.
     assertEquals(0, nns[0].getFSImage().getMostRecentCheckpointTxId());
   }
-  
+
+  /**
+   * Test standby namenode upload fsiamge to multiple other namenodes in 
parallel, in the
+   * cluster with observer namenodes.
+   */
+  @Test
+  @Timeout(value = 300)
+  public void testCheckpointParallelUpload() throws Exception {
+    // Set dfs.namenode.checkpoint.txns differently on the first NN to avoid it
+    // doing checkpoint when it becomes a standby
+    cluster.getConfiguration(0).setInt(
+        DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 1000);
+
+    // don't compress, we want a big image
+    for (int i = 0; i < NUM_NNS; i++) {
+      cluster.getConfiguration(i).setBoolean(
+          DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, false);
+    }
+
+    // Throttle SBN upload to make it hang during upload to ANN, and enable 
parallel upload fsimage.
+    for (int i = 1; i < NUM_NNS; i++) {
+      cluster.getConfiguration(i).setLong(
+          DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_KEY, 100);
+      cluster.getConfiguration(i).setBoolean(
+          DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PARALLEL_UPLOAD_ENABLED_KEY, 
true);
+    }
+    for (int i = 0; i < NUM_NNS; i++) {
+      cluster.restartNameNode(i);
+    }
+
+    // update references to each of the nns
+    setNNs();
+
+    cluster.transitionToActive(0);
+
+    doEdits(0, 100);
+
+    for (int i = 1; i < NUM_NNS; i++) {
+      HATestUtil.waitForStandbyToCatchUp(nns[0], nns[i]);
+      HATestUtil.waitForCheckpoint(cluster, i, ImmutableList.of(104));
+    }
+    cluster.transitionToStandby(0);
+    cluster.transitionToActive(1);
+
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        int transferThreadCount = 0;
+        ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
+        ThreadInfo[] threads = threadBean.getThreadInfo(
+            threadBean.getAllThreadIds(), 1);
+        for (ThreadInfo thread: threads) {
+          if (thread.getThreadName().startsWith("TransferFsImageUpload")) {
+            transferThreadCount++;
+          }
+        }
+        return transferThreadCount == NUM_NNS - 1;
+      }
+    }, 1000, 30000);
+  }
+
   /**
    * Make sure that clients will receive StandbyExceptions even when a
    * checkpoint is in progress on the SBN, and therefore the 
StandbyCheckpointer


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to