This is an automated email from the ASF dual-hosted git repository.

jing9 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 643dfd6  HDFS-15842. HDFS mover to emit metrics. (#2738)
643dfd6 is described below

commit 643dfd60e442b35fc9bb2dbf32fcdf28c3dd2f58
Author: LeonGao <lia...@uber.com>
AuthorDate: Sat Jun 19 15:39:46 2021 -0700

    HDFS-15842. HDFS mover to emit metrics. (#2738)
---
 .../hadoop/hdfs/server/balancer/Dispatcher.java    |  1 +
 .../hdfs/server/balancer/NameNodeConnector.java    |  9 ++-
 .../org/apache/hadoop/hdfs/server/mover/Mover.java | 21 ++++++
 .../hadoop/hdfs/server/mover/MoverMetrics.java     | 83 ++++++++++++++++++++++
 .../hadoop/hdfs/server/mover/package-info.java     | 27 +++++++
 .../apache/hadoop/hdfs/server/mover/TestMover.java | 57 +++++++++++++++
 6 files changed, 196 insertions(+), 2 deletions(-)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index a953800..fb91071 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -398,6 +398,7 @@ public class Dispatcher {
         LOG.info("Successfully moved " + this);
       } catch (IOException e) {
         LOG.warn("Failed to move " + this, e);
+        nnc.getBlocksFailed().incrementAndGet();
         target.getDDatanode().setHasFailure();
         // Check that the failure is due to block pinning errors.
         if (e instanceof BlockPinningException) {
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
index 4d05242..7634eaf 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
@@ -163,6 +163,7 @@ public class NameNodeConnector implements Closeable {
   private final List<Path> targetPaths;
   private final AtomicLong bytesMoved = new AtomicLong();
   private final AtomicLong blocksMoved = new AtomicLong();
+  private final AtomicLong blocksFailed = new AtomicLong();
 
   private final int maxNotChangedIterations;
   private int notChangedIterations = 0;
@@ -230,14 +231,18 @@ public class NameNodeConnector implements Closeable {
     return blockpoolID;
   }
 
-  AtomicLong getBytesMoved() {
+  public AtomicLong getBytesMoved() {
     return bytesMoved;
   }
 
-  AtomicLong getBlocksMoved() {
+  public AtomicLong getBlocksMoved() {
     return blocksMoved;
   }
 
+  public AtomicLong getBlocksFailed() {
+    return blocksFailed;
+  }
+
   public void addBytesMoved(long numBytes) {
     bytesMoved.addAndGet(numBytes);
     blocksMoved.incrementAndGet();
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
index 8b9e9ed..dd7abf6 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
@@ -42,6 +42,8 @@ import 
org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.source.JvmMetrics;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.security.SecurityUtil;
@@ -118,6 +120,8 @@ public class Mover {
   private final int retryMaxAttempts;
   private final AtomicInteger retryCount;
   private final Map<Long, Set<DatanodeInfo>> excludedPinnedBlocks;
+  private final MoverMetrics metrics;
+  private final NameNodeConnector nnc;
 
   private final BlockStoragePolicy[] blockStoragePolicies;
 
@@ -155,6 +159,8 @@ public class Mover {
     this.blockStoragePolicies = new BlockStoragePolicy[1 <<
         BlockStoragePolicySuite.ID_BIT_LENGTH];
     this.excludedPinnedBlocks = excludedPinnedBlocks;
+    this.nnc = nnc;
+    this.metrics = MoverMetrics.create(this);
   }
 
   void init() throws IOException {
@@ -196,6 +202,10 @@ public class Mover {
     }
   }
 
+  public NameNodeConnector getNnc() {
+    return nnc;
+  }
+
   DBlock newDBlock(LocatedBlock lb, List<MLocation> locations,
                    ErasureCodingPolicy ecPolicy) {
     Block blk = lb.getBlock().getLocalBlock();
@@ -296,6 +306,7 @@ public class Mover {
      *         round
      */
     private Result processNamespace() throws IOException {
+      metrics.setProcessingNamespace(true);
       getSnapshottableDirs();
       Result result = new Result();
       for (Path target : targetPaths) {
@@ -322,6 +333,7 @@ public class Mover {
         retryCount.set(0);
       }
       result.updateHasRemaining(hasFailed);
+      metrics.setProcessingNamespace(false);
       return result;
     }
 
@@ -374,6 +386,7 @@ public class Mover {
             // the full path is a snapshot path but it is also included in the
             // current directory tree, thus ignore it.
             processFile(fullPath, (HdfsLocatedFileStatus) status, result);
+            metrics.incrFilesProcessed();
           }
         } catch (IOException e) {
           LOG.warn("Failed to check the status of " + parent
@@ -521,6 +534,7 @@ public class Mover {
         final PendingMove pm = source.addPendingMove(db, target);
         if (pm != null) {
           dispatcher.executePendingMove(pm);
+          metrics.incrBlocksScheduled();
           return true;
         }
       }
@@ -539,6 +553,7 @@ public class Mover {
             final PendingMove pm = source.addPendingMove(db, target);
             if (pm != null) {
               dispatcher.executePendingMove(pm);
+              metrics.incrBlocksScheduled();
               return true;
             }
           }
@@ -650,6 +665,11 @@ public class Mover {
     Map<Long, Set<DatanodeInfo>> excludedPinnedBlocks = new HashMap<>();
     LOG.info("namenodes = " + namenodes);
 
+    DefaultMetricsSystem.initialize("Mover");
+    JvmMetrics.create("Mover",
+        conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY),
+        DefaultMetricsSystem.instance());
+
     checkKeytabAndInit(conf);
     List<NameNodeConnector> connectors = Collections.emptyList();
     try {
@@ -818,6 +838,7 @@ public class Mover {
         System.out.println(e + ".  Exiting ...");
         return ExitStatus.ILLEGAL_ARGUMENTS.getExitCode();
       } finally {
+        DefaultMetricsSystem.shutdown();
         System.out.format("%-24s ", 
DateFormat.getDateTimeInstance().format(new Date()));
         System.out.println("Mover took " + 
StringUtils.formatTime(Time.monotonicNow()-startTime));
       }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/MoverMetrics.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/MoverMetrics.java
new file mode 100644
index 0000000..846e6f6
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/MoverMetrics.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.mover;
+
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+
+/**
+ * Metrics for HDFS Mover of a blockpool.
+ */
+@Metrics(about="Mover metrics", context="dfs")
+final class MoverMetrics {
+
+  private final Mover mover;
+
+  @Metric("If mover is processing namespace.")
+  private MutableGaugeInt processingNamespace;
+
+  @Metric("Number of blocks being scheduled.")
+  private MutableCounterLong blocksScheduled;
+
+  @Metric("Number of files being processed.")
+  private MutableCounterLong filesProcessed;
+
+  private MoverMetrics(Mover m) {
+    this.mover = m;
+  }
+
+  public static MoverMetrics create(Mover mover) {
+    MoverMetrics m = new MoverMetrics(mover);
+    return DefaultMetricsSystem.instance().register(
+        m.getName(), null, m);
+  }
+
+  String getName() {
+    return "Mover-" + mover.getNnc().getBlockpoolID();
+  }
+
+  @Metric("Bytes that already moved by mover.")
+  public long getBytesMoved() {
+    return mover.getNnc().getBytesMoved().get();
+  }
+
+  @Metric("Number of blocks that successfully moved by mover.")
+  public long getBlocksMoved() {
+    return mover.getNnc().getBlocksMoved().get();
+  }
+
+  @Metric("Number of blocks that failed moved by mover.")
+  public long getBlocksFailed() {
+    return mover.getNnc().getBlocksFailed().get();
+  }
+
+  void setProcessingNamespace(boolean processingNamespace) {
+    this.processingNamespace.set(processingNamespace ? 1 : 0);
+  }
+
+  void incrBlocksScheduled() {
+    this.blocksScheduled.incr();
+  }
+
+  void incrFilesProcessed() {
+    this.filesProcessed.incr();
+  }
+}
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/package-info.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/package-info.java
new file mode 100644
index 0000000..92db7b7
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/package-info.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Mover is a data migration tool for tiered storage.
+ * It scans provided paths in HDFS to check
+ * if the block placement satisfies the storage policy.
+ * For the blocks violating the storage policy,
+ * it moves the replicas to a different storage type
+ * in order to fulfill the storage policy requirement.
+ */
+package org.apache.hadoop.hdfs.server.mover;
\ No newline at end of file
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
index f428b2c..3cec739 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
@@ -36,6 +36,8 @@ import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBER
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
 import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 
 import java.io.File;
 import java.io.IOException;
@@ -86,12 +88,15 @@ import org.apache.hadoop.hdfs.server.mover.Mover.MLocation;
 import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
 import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.minikdc.MiniKdc;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authentication.util.KerberosName;
 import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.MetricsAsserts;
 import org.apache.hadoop.util.ToolRunner;
 import org.junit.Assert;
 import org.junit.Test;
@@ -1235,6 +1240,58 @@ public class TestMover {
     }
   }
 
+  @Test(timeout=100000)
+  public void testMoverMetrics() throws Exception {
+    long blockSize = 10*1024*1024;
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    conf.setInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY, 1);
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
+    conf.setLong(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, blockSize);
+
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(2)
+        .storageTypes(
+            new StorageType[][] {{StorageType.DISK, StorageType.DISK},
+                {StorageType.ARCHIVE, StorageType.ARCHIVE}})
+        .build();
+
+    cluster.waitActive();
+    final DistributedFileSystem fs = cluster.getFileSystem();
+
+    final String file = "/testMaxIterationTime.dat";
+    final Path path = new Path(file);
+    short repFactor = 1;
+    int seed = 0xFAFAFA;
+    // write to DISK
+    DFSTestUtil.createFile(fs, path, 4L * blockSize, repFactor, seed);
+
+    // move to ARCHIVE
+    fs.setStoragePolicy(new Path(file), "COLD");
+
+    Map<URI, List<Path>> nnWithPath = new HashMap<>();
+    List<Path> paths = new ArrayList<>();
+    paths.add(path);
+    nnWithPath
+        .put(DFSUtil.getInternalNsRpcUris(conf).iterator().next(), paths);
+
+    Mover.run(nnWithPath, conf);
+
+    final String moverMetricsName = "Mover-"
+        + cluster.getNameNode(0).getNamesystem().getBlockPoolId();
+    MetricsSource moverMetrics =
+        DefaultMetricsSystem.instance().getSource(moverMetricsName);
+    assertNotNull(moverMetrics);
+
+    MetricsRecordBuilder rb = MetricsAsserts.getMetrics(moverMetricsName);
+    // Check metrics
+    assertEquals(4, MetricsAsserts.getLongCounter("BlocksScheduled", rb));
+    assertEquals(1, MetricsAsserts.getLongCounter("FilesProcessed", rb));
+    assertEquals(41943040, MetricsAsserts.getLongGauge("BytesMoved", rb));
+    assertEquals(4, MetricsAsserts.getLongGauge("BlocksMoved", rb));
+    assertEquals(0, MetricsAsserts.getLongGauge("BlocksFailed", rb));
+  }
+
   private void createFileWithFavoredDatanodes(final Configuration conf,
       final MiniDFSCluster cluster, final DistributedFileSystem dfs)
           throws IOException {

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to