This is an automated email from the ASF dual-hosted git repository. jing9 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 643dfd6 HDFS-15842. HDFS mover to emit metrics. (#2738) 643dfd6 is described below commit 643dfd60e442b35fc9bb2dbf32fcdf28c3dd2f58 Author: LeonGao <lia...@uber.com> AuthorDate: Sat Jun 19 15:39:46 2021 -0700 HDFS-15842. HDFS mover to emit metrics. (#2738) --- .../hadoop/hdfs/server/balancer/Dispatcher.java | 1 + .../hdfs/server/balancer/NameNodeConnector.java | 9 ++- .../org/apache/hadoop/hdfs/server/mover/Mover.java | 21 ++++++ .../hadoop/hdfs/server/mover/MoverMetrics.java | 83 ++++++++++++++++++++++ .../hadoop/hdfs/server/mover/package-info.java | 27 +++++++ .../apache/hadoop/hdfs/server/mover/TestMover.java | 57 +++++++++++++++ 6 files changed, 196 insertions(+), 2 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index a953800..fb91071 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -398,6 +398,7 @@ public class Dispatcher { LOG.info("Successfully moved " + this); } catch (IOException e) { LOG.warn("Failed to move " + this, e); + nnc.getBlocksFailed().incrementAndGet(); target.getDDatanode().setHasFailure(); // Check that the failure is due to block pinning errors. if (e instanceof BlockPinningException) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java index 4d05242..7634eaf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java @@ -163,6 +163,7 @@ public class NameNodeConnector implements Closeable { private final List<Path> targetPaths; private final AtomicLong bytesMoved = new AtomicLong(); private final AtomicLong blocksMoved = new AtomicLong(); + private final AtomicLong blocksFailed = new AtomicLong(); private final int maxNotChangedIterations; private int notChangedIterations = 0; @@ -230,14 +231,18 @@ public class NameNodeConnector implements Closeable { return blockpoolID; } - AtomicLong getBytesMoved() { + public AtomicLong getBytesMoved() { return bytesMoved; } - AtomicLong getBlocksMoved() { + public AtomicLong getBlocksMoved() { return blocksMoved; } + public AtomicLong getBlocksFailed() { + return blocksFailed; + } + public void addBytesMoved(long numBytes) { bytesMoved.addAndGet(numBytes); blocksMoved.incrementAndGet(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java index 8b9e9ed..dd7abf6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java @@ -42,6 +42,8 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.source.JvmMetrics; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.security.SecurityUtil; @@ -118,6 +120,8 @@ public class Mover { private final int retryMaxAttempts; private final AtomicInteger retryCount; private final Map<Long, Set<DatanodeInfo>> excludedPinnedBlocks; + private final MoverMetrics metrics; + private final NameNodeConnector nnc; private final BlockStoragePolicy[] blockStoragePolicies; @@ -155,6 +159,8 @@ public class Mover { this.blockStoragePolicies = new BlockStoragePolicy[1 << BlockStoragePolicySuite.ID_BIT_LENGTH]; this.excludedPinnedBlocks = excludedPinnedBlocks; + this.nnc = nnc; + this.metrics = MoverMetrics.create(this); } void init() throws IOException { @@ -196,6 +202,10 @@ public class Mover { } } + public NameNodeConnector getNnc() { + return nnc; + } + DBlock newDBlock(LocatedBlock lb, List<MLocation> locations, ErasureCodingPolicy ecPolicy) { Block blk = lb.getBlock().getLocalBlock(); @@ -296,6 +306,7 @@ public class Mover { * round */ private Result processNamespace() throws IOException { + metrics.setProcessingNamespace(true); getSnapshottableDirs(); Result result = new Result(); for (Path target : targetPaths) { @@ -322,6 +333,7 @@ public class Mover { retryCount.set(0); } result.updateHasRemaining(hasFailed); + metrics.setProcessingNamespace(false); return result; } @@ -374,6 +386,7 @@ public class Mover { // the full path is a snapshot path but it is also included in the // current directory tree, thus ignore it. processFile(fullPath, (HdfsLocatedFileStatus) status, result); + metrics.incrFilesProcessed(); } } catch (IOException e) { LOG.warn("Failed to check the status of " + parent @@ -521,6 +534,7 @@ public class Mover { final PendingMove pm = source.addPendingMove(db, target); if (pm != null) { dispatcher.executePendingMove(pm); + metrics.incrBlocksScheduled(); return true; } } @@ -539,6 +553,7 @@ public class Mover { final PendingMove pm = source.addPendingMove(db, target); if (pm != null) { dispatcher.executePendingMove(pm); + metrics.incrBlocksScheduled(); return true; } } @@ -650,6 +665,11 @@ public class Mover { Map<Long, Set<DatanodeInfo>> excludedPinnedBlocks = new HashMap<>(); LOG.info("namenodes = " + namenodes); + DefaultMetricsSystem.initialize("Mover"); + JvmMetrics.create("Mover", + conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY), + DefaultMetricsSystem.instance()); + checkKeytabAndInit(conf); List<NameNodeConnector> connectors = Collections.emptyList(); try { @@ -818,6 +838,7 @@ public class Mover { System.out.println(e + ". Exiting ..."); return ExitStatus.ILLEGAL_ARGUMENTS.getExitCode(); } finally { + DefaultMetricsSystem.shutdown(); System.out.format("%-24s ", DateFormat.getDateTimeInstance().format(new Date())); System.out.println("Mover took " + StringUtils.formatTime(Time.monotonicNow()-startTime)); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/MoverMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/MoverMetrics.java new file mode 100644 index 0000000..846e6f6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/MoverMetrics.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.mover; + +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableGaugeInt; + +/** + * Metrics for HDFS Mover of a blockpool. + */ +@Metrics(about="Mover metrics", context="dfs") +final class MoverMetrics { + + private final Mover mover; + + @Metric("If mover is processing namespace.") + private MutableGaugeInt processingNamespace; + + @Metric("Number of blocks being scheduled.") + private MutableCounterLong blocksScheduled; + + @Metric("Number of files being processed.") + private MutableCounterLong filesProcessed; + + private MoverMetrics(Mover m) { + this.mover = m; + } + + public static MoverMetrics create(Mover mover) { + MoverMetrics m = new MoverMetrics(mover); + return DefaultMetricsSystem.instance().register( + m.getName(), null, m); + } + + String getName() { + return "Mover-" + mover.getNnc().getBlockpoolID(); + } + + @Metric("Bytes that already moved by mover.") + public long getBytesMoved() { + return mover.getNnc().getBytesMoved().get(); + } + + @Metric("Number of blocks that successfully moved by mover.") + public long getBlocksMoved() { + return mover.getNnc().getBlocksMoved().get(); + } + + @Metric("Number of blocks that failed moved by mover.") + public long getBlocksFailed() { + return mover.getNnc().getBlocksFailed().get(); + } + + void setProcessingNamespace(boolean processingNamespace) { + this.processingNamespace.set(processingNamespace ? 1 : 0); + } + + void incrBlocksScheduled() { + this.blocksScheduled.incr(); + } + + void incrFilesProcessed() { + this.filesProcessed.incr(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/package-info.java new file mode 100644 index 0000000..92db7b7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/package-info.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Mover is a data migration tool for tiered storage. + * It scans provided paths in HDFS to check + * if the block placement satisfies the storage policy. + * For the blocks violating the storage policy, + * it moves the replicas to a different storage type + * in order to fulfill the storage policy requirement. + */ +package org.apache.hadoop.hdfs.server.mover; \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java index f428b2c..3cec739 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java @@ -36,6 +36,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBER import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY; import static org.apache.hadoop.test.MetricsAsserts.assertCounter; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import java.io.File; import java.io.IOException; @@ -86,12 +88,15 @@ import org.apache.hadoop.hdfs.server.mover.Mover.MLocation; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.minikdc.MiniKdc; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authentication.util.KerberosName; import org.apache.hadoop.security.ssl.KeyStoreTestUtil; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.MetricsAsserts; import org.apache.hadoop.util.ToolRunner; import org.junit.Assert; import org.junit.Test; @@ -1235,6 +1240,58 @@ public class TestMover { } } + @Test(timeout=100000) + public void testMoverMetrics() throws Exception { + long blockSize = 10*1024*1024; + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + conf.setInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY, 1); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); + conf.setLong(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, blockSize); + + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(2) + .storageTypes( + new StorageType[][] {{StorageType.DISK, StorageType.DISK}, + {StorageType.ARCHIVE, StorageType.ARCHIVE}}) + .build(); + + cluster.waitActive(); + final DistributedFileSystem fs = cluster.getFileSystem(); + + final String file = "/testMaxIterationTime.dat"; + final Path path = new Path(file); + short repFactor = 1; + int seed = 0xFAFAFA; + // write to DISK + DFSTestUtil.createFile(fs, path, 4L * blockSize, repFactor, seed); + + // move to ARCHIVE + fs.setStoragePolicy(new Path(file), "COLD"); + + Map<URI, List<Path>> nnWithPath = new HashMap<>(); + List<Path> paths = new ArrayList<>(); + paths.add(path); + nnWithPath + .put(DFSUtil.getInternalNsRpcUris(conf).iterator().next(), paths); + + Mover.run(nnWithPath, conf); + + final String moverMetricsName = "Mover-" + + cluster.getNameNode(0).getNamesystem().getBlockPoolId(); + MetricsSource moverMetrics = + DefaultMetricsSystem.instance().getSource(moverMetricsName); + assertNotNull(moverMetrics); + + MetricsRecordBuilder rb = MetricsAsserts.getMetrics(moverMetricsName); + // Check metrics + assertEquals(4, MetricsAsserts.getLongCounter("BlocksScheduled", rb)); + assertEquals(1, MetricsAsserts.getLongCounter("FilesProcessed", rb)); + assertEquals(41943040, MetricsAsserts.getLongGauge("BytesMoved", rb)); + assertEquals(4, MetricsAsserts.getLongGauge("BlocksMoved", rb)); + assertEquals(0, MetricsAsserts.getLongGauge("BlocksFailed", rb)); + } + private void createFileWithFavoredDatanodes(final Configuration conf, final MiniDFSCluster cluster, final DistributedFileSystem dfs) throws IOException { --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org