[ 
https://issues.apache.org/jira/browse/HADOOP-16830?focusedWorklogId=492542&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-492542
 ]

ASF GitHub Bot logged work on HADOOP-16830:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 29/Sep/20 16:33
            Start Date: 29/Sep/20 16:33
    Worklog Time Spent: 10m 
      Work Description: steveloughran commented on a change in pull request 
#2323:
URL: https://github.com/apache/hadoop/pull/2323#discussion_r496880269



##########
File path: 
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsStoreImpl.java
##########
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics.impl;
+
+import javax.annotation.Nullable;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.statistics.DurationTracker;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.MeanStatistic;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MAX;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MEAN;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MIN;
+import static 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.aggregateMaximums;
+import static 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.aggregateMinimums;
+import static 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.dynamicIOStatistics;
+import static 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.maybeUpdateMaximum;
+import static 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.maybeUpdateMinimum;
+
+/**
+ * Implementation of {@link IOStatisticsStore}.
+ */
+final class IOStatisticsStoreImpl extends WrappedIOStatistics
+    implements IOStatisticsStore {
+
+  /**
+   * Log changes at debug.
+   * Noisy, but occasionally useful.
+   */
+  private static final Logger LOG =
+      LoggerFactory.getLogger(IOStatisticsStoreImpl.class);
+
+  private final Map<String, AtomicLong> counterMap = new ConcurrentHashMap<>();
+
+  private final Map<String, AtomicLong> gaugeMap = new ConcurrentHashMap<>();
+
+  private final Map<String, AtomicLong> minimumMap = new ConcurrentHashMap<>();
+
+  private final Map<String, AtomicLong> maximumMap = new ConcurrentHashMap<>();
+
+  private final Map<String, MeanStatistic> meanStatisticMap
+      = new ConcurrentHashMap<>();
+
+  /**
+   * Constructor invoked via the builder.
+   * @param counters keys to use for the counter statistics.
+   * @param gauges names of gauges
+   * @param minimums names of minimums
+   * @param maximums names of maximums
+   * @param meanStatistics names of mean statistics.
+   */
+  IOStatisticsStoreImpl(
+      final List<String> counters,
+      final List<String> gauges,
+      final List<String> minimums,
+      final List<String> maximums,
+      final List<String> meanStatistics) {
+    // initially create the superclass with no wrapped mapping;
+    super(null);
+
+    // now construct a dynamic statistics source mapping to
+    // the various counters, gauges etc dynamically created
+    // into maps
+    DynamicIOStatisticsBuilder builder = dynamicIOStatistics();
+    if (counters != null) {
+      for (String key : counters) {
+        AtomicLong counter = new AtomicLong();
+        counterMap.put(key, counter);
+        builder.withAtomicLongCounter(key, counter);
+      }
+    }
+    if (gauges != null) {
+      for (String key : gauges) {
+        AtomicLong gauge = new AtomicLong();
+        gaugeMap.put(key, gauge);
+        builder.withAtomicLongGauge(key, gauge);
+      }
+    }
+    if (maximums != null) {
+      for (String key : maximums) {
+        AtomicLong maximum = new AtomicLong(MAX_UNSET_VALUE);
+        maximumMap.put(key, maximum);
+        builder.withAtomicLongMaximum(key, maximum);
+      }
+    }
+    if (minimums != null) {
+      for (String key : minimums) {
+        AtomicLong minimum = new AtomicLong(MIN_UNSET_VALUE);
+        minimumMap.put(key, minimum);
+        builder.withAtomicLongMinimum(key, minimum);
+      }
+    }
+    if (meanStatistics != null) {
+      for (String key : meanStatistics) {
+        meanStatisticMap.put(key, new MeanStatistic());
+        builder.withMeanStatisticFunction(key, k -> meanStatisticMap.get(k));
+      }
+    }
+    setWrapped(builder.build());
+  }
+
+  /**
+   * Set an atomic long to a value.
+   * @param aLong atomic long; may be null
+   * @param value value to set to
+   */
+  private void setAtomicLong(final AtomicLong aLong, final long value) {
+    if (aLong != null) {
+      aLong.set(value);
+    }
+  }
+
+  /**
+   * increment an atomic long and return its value;
+   * null long is no-op returning 0.
+   * @param aLong atomic long; may be null
+   * @param increment amount to increment; -ve for a decrement
+   * @return final value or 0
+   */
+  private long incAtomicLong(final AtomicLong aLong,
+      final long increment) {
+    if (aLong != null) {
+      return aLong.addAndGet(increment);
+    } else {
+      return 0;
+    }
+  }
+
+  @Override
+  public void setCounter(final String key, final long value) {
+    setAtomicLong(counterMap.get(key), value);
+    LOG.debug("Setting counter {} to {}", key, value);
+  }
+
+  @Override
+  public long incrementCounter(final String key, final long value) {

Review comment:
       fixed now




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 492542)
    Time Spent: 5h 50m  (was: 5h 40m)

> Add public IOStatistics API
> ---------------------------
>
>                 Key: HADOOP-16830
>                 URL: https://issues.apache.org/jira/browse/HADOOP-16830
>             Project: Hadoop Common
>          Issue Type: New Feature
>          Components: fs, fs/s3
>    Affects Versions: 3.3.0
>            Reporter: Steve Loughran
>            Assignee: Steve Loughran
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 5h 50m
>  Remaining Estimate: 0h
>
> Applications like to collect the statistics which specific operations take, 
> by collecting exactly those operations done during the execution of FS API 
> calls by their individual worker threads, and returning these to their job 
> driver
> * S3A has a statistics API for some streams, but it's a non-standard one; 
> Impala &c can't use it
> * FileSystem storage statistics are public, but as they aren't cross-thread, 
> they don't aggregate properly
> Proposed
> # A new IOStatistics interface to serve up statistics
> # S3A to implement
> # other stores to follow
> # Pass-through from the usual wrapper classes (FS data input/output streams)
> It's hard to think about how best to offer an API for operation context 
> stats, and how to actually implement.
> ThreadLocal isn't enough because the helper threads need to update on the 
> thread local value of the instigator
> My Initial PoC doesn't address that issue, but it shows what I'm thinking of



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to