[ 
https://issues.apache.org/jira/browse/HDFS-16039?focusedWorklogId=609363&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-609363
 ]

ASF GitHub Bot logged work on HDFS-16039:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Jun/21 20:10
            Start Date: 09/Jun/21 20:10
    Worklog Time Spent: 10m 
      Work Description: goiri commented on a change in pull request #3086:
URL: https://github.com/apache/hadoop/pull/3086#discussion_r648647841



##########
File path: 
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/RBFMetrics.java
##########
@@ -372,12 +420,69 @@ private static void setStateStoreVersions(
 
   @Override
   public long getTotalCapacity() {
-    return getNameserviceAggregatedLong(MembershipStats::getTotalSpace);
+    return getNameserviceAggregatedLong(
+        DatanodeReportType.LIVE, DatanodeInfo::getCapacity);
+  }
+
+  public LoadingCache<DatanodeReportType, DatanodeInfo[]> getDnCache() {
+    return dnCache;
+  }
+
+  /**
+   * Get the aggregated value for a DatanodeReportType and
+   * a method for all nameservices.
+   * @param type a DatanodeReportType
+   * @param f Method reference
+   * @return Aggregated long.
+   */
+  public long getNameserviceAggregatedLong(
+      DatanodeReportType type, ToLongFunction<DatanodeInfo> f){
+    long size = 0;
+    try {
+      size = Arrays.stream(dnCache.get(type)).mapToLong(f).sum();

Review comment:
       Extract the get(type)?

##########
File path: 
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/RBFMetrics.java
##########
@@ -372,12 +420,69 @@ private static void setStateStoreVersions(
 
   @Override
   public long getTotalCapacity() {
-    return getNameserviceAggregatedLong(MembershipStats::getTotalSpace);
+    return getNameserviceAggregatedLong(
+        DatanodeReportType.LIVE, DatanodeInfo::getCapacity);
+  }
+
+  public LoadingCache<DatanodeReportType, DatanodeInfo[]> getDnCache() {
+    return dnCache;
+  }
+
+  /**
+   * Get the aggregated value for a DatanodeReportType and
+   * a method for all nameservices.
+   * @param type a DatanodeReportType
+   * @param f Method reference
+   * @return Aggregated long.
+   */
+  public long getNameserviceAggregatedLong(
+      DatanodeReportType type, ToLongFunction<DatanodeInfo> f){
+    long size = 0;
+    try {
+      size = Arrays.stream(dnCache.get(type)).mapToLong(f).sum();
+    } catch (ExecutionException e) {
+      LOG.debug("Cannot get " + type + " nodes", e.getMessage());
+    }
+    return size;
+  }
+
+  /**
+   * Get the aggregated value for a DatanodeReportType and
+   * a method for all nameservices.
+   * @param type a DatanodeReportType
+   * @param f Method reference
+   * @return Aggregated Integer.
+   */
+  public int getNameserviceAggregatedInt(
+      DatanodeReportType type, Predicate<DatanodeInfo> f){
+    int size = 0;
+    try {
+      Arrays.stream(dnCache.get(DatanodeReportType.LIVE)).filter(f).count();

Review comment:
       We are not updating size at all, are we?
   It is also not the most intuitive code to read; maybe extract a litle.

##########
File path: 
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java
##########
@@ -500,6 +498,8 @@ private String getNodesImpl(final DatanodeReportType type) {
       LOG.error("Cannot get {} nodes, subclusters timed out responding", type);
     } catch (IOException e) {
       LOG.error("Cannot get " + type + " nodes", e);
+    } catch (ExecutionException e) {
+      LOG.error("Cannot get " + type + " nodes", e);

Review comment:
       Do we support logger {}?

##########
File path: 
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/RBFMetrics.java
##########
@@ -372,12 +420,69 @@ private static void setStateStoreVersions(
 
   @Override
   public long getTotalCapacity() {
-    return getNameserviceAggregatedLong(MembershipStats::getTotalSpace);
+    return getNameserviceAggregatedLong(
+        DatanodeReportType.LIVE, DatanodeInfo::getCapacity);
+  }
+
+  public LoadingCache<DatanodeReportType, DatanodeInfo[]> getDnCache() {

Review comment:
       Add a javadoc explaining the purpose.

##########
File path: 
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/RBFMetrics.java
##########
@@ -372,12 +420,69 @@ private static void setStateStoreVersions(
 
   @Override
   public long getTotalCapacity() {
-    return getNameserviceAggregatedLong(MembershipStats::getTotalSpace);
+    return getNameserviceAggregatedLong(
+        DatanodeReportType.LIVE, DatanodeInfo::getCapacity);
+  }
+
+  public LoadingCache<DatanodeReportType, DatanodeInfo[]> getDnCache() {
+    return dnCache;
+  }
+
+  /**
+   * Get the aggregated value for a DatanodeReportType and
+   * a method for all nameservices.
+   * @param type a DatanodeReportType
+   * @param f Method reference
+   * @return Aggregated long.
+   */
+  public long getNameserviceAggregatedLong(
+      DatanodeReportType type, ToLongFunction<DatanodeInfo> f){
+    long size = 0;
+    try {
+      size = Arrays.stream(dnCache.get(type)).mapToLong(f).sum();
+    } catch (ExecutionException e) {
+      LOG.debug("Cannot get " + type + " nodes", e.getMessage());
+    }
+    return size;
+  }
+
+  /**
+   * Get the aggregated value for a DatanodeReportType and
+   * a method for all nameservices.
+   * @param type a DatanodeReportType
+   * @param f Method reference
+   * @return Aggregated Integer.
+   */
+  public int getNameserviceAggregatedInt(
+      DatanodeReportType type, Predicate<DatanodeInfo> f){
+    int size = 0;
+    try {
+      Arrays.stream(dnCache.get(DatanodeReportType.LIVE)).filter(f).count();
+    } catch (ExecutionException e) {
+      LOG.debug("Cannot get " + type + " nodes", e.getMessage());
+    }
+    return size;
+  }
+
+  /**
+   * Get the aggregated value for a DatanodeReportType for all nameservices.
+   * @param type a DatanodeReportType
+   * @return Aggregated Integer.
+   */
+  public int getNameserviceAggregatedLength(DatanodeReportType type){
+    int size = 0;
+    try {
+      size = dnCache.get(type).length;
+    } catch (ExecutionException e) {
+      LOG.debug("Cannot get " + type + " nodes", e.getMessage());

Review comment:
       logger format {}

##########
File path: 
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/RBFMetrics.java
##########
@@ -170,6 +181,43 @@ public RBFMetrics(Router router) throws IOException {
     this.topTokenRealOwners = conf.getInt(
         RBFConfigKeys.DFS_ROUTER_METRICS_TOP_NUM_TOKEN_OWNERS_KEY,
         RBFConfigKeys.DFS_ROUTER_METRICS_TOP_NUM_TOKEN_OWNERS_KEY_DEFAULT);
+    // Initialize the cache for the DN reports
+    this.dnReportTimeOut = conf.getTimeDuration(
+        RBFConfigKeys.DN_REPORT_TIME_OUT,
+        RBFConfigKeys.DN_REPORT_TIME_OUT_MS_DEFAULT, TimeUnit.MILLISECONDS);
+    long dnCacheExpire = conf.getTimeDuration(
+        RBFConfigKeys.DN_REPORT_CACHE_EXPIRE,
+        RBFConfigKeys.DN_REPORT_CACHE_EXPIRE_MS_DEFAULT, 
TimeUnit.MILLISECONDS);
+    this.dnCache = CacheBuilder.newBuilder()
+        .expireAfterWrite(dnCacheExpire, TimeUnit.MILLISECONDS)
+        .build(
+            new CacheLoader<DatanodeReportType, DatanodeInfo[]>() {
+              @Override
+              public DatanodeInfo[] load(DatanodeReportType type) {
+                return getNodesImpl(type);
+              }
+            });
+  }
+
+  /**
+   * Get all the nodes in the federation from a particular type.
+   * @param type Type of the datanodes to check.
+   * @return DatanodeInfo[] with the nodes.
+   */
+  private DatanodeInfo[] getNodesImpl(DatanodeReportType type) {
+    DatanodeInfo[] datanodes = DatanodeInfo.EMPTY_ARRAY;
+    try {
+      RouterRpcServer rpcServer = this.router.getRpcServer();
+      datanodes = rpcServer.getDatanodeReport(type, false,
+              dnReportTimeOut);
+    } catch (StandbyException e) {
+      LOG.error("Cannot get {} nodes, Router in safe mode", type);
+    } catch (SubClusterTimeoutException e) {
+      LOG.error("Cannot get {} nodes, subclusters timed out responding", type);
+    } catch (IOException e) {
+      LOG.error("Cannot get " + type + " nodes", e);

Review comment:
       Use logger {} format.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 609363)
    Time Spent: 0.5h  (was: 20m)

> RBF:  Some indicators of RBFMetrics count inaccurately
> ------------------------------------------------------
>
>                 Key: HDFS-16039
>                 URL: https://issues.apache.org/jira/browse/HDFS-16039
>             Project: Hadoop HDFS
>          Issue Type: Bug
>          Components: rbf
>    Affects Versions: 3.4.0
>            Reporter: Xiangyi Zhu
>            Assignee: Xiangyi Zhu
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> RBFMetrics#getNumLiveNodes, getNumNamenodes, getTotalCapacity
> The current statistical algorithm is to accumulate all Nn indicators, which 
> will lead to inaccurate counting. I think that the same ClusterID only needs 
> to take one Max and then do the accumulation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org

Reply via email to