This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new cf83d475127 HIVE-28638: Refactor stats handling in 
StatsRecordingThreadPool (#5557) (Laszlo Bodor reviewed by Dmitriy Fingerman)
cf83d475127 is described below

commit cf83d4751271622a9a700c6f2330dbfff38801d2
Author: Bodor Laszlo <[email protected]>
AuthorDate: Thu Dec 5 16:39:44 2024 +0100

    HIVE-28638: Refactor stats handling in StatsRecordingThreadPool (#5557) 
(Laszlo Bodor reviewed by Dmitriy Fingerman)
---
 .../hive/llap/LlapThreadLocalStatistics.java       | 176 +++++++++++++++++++++
 .../java/org/apache/hadoop/hive/llap/LlapUtil.java | 108 -------------
 .../hive/llap/TestLlapThreadLocalStatistics.java   |  94 +++++++++++
 .../llap/daemon/impl/StatsRecordingThreadPool.java | 122 ++++----------
 4 files changed, 298 insertions(+), 202 deletions(-)

diff --git 
a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapThreadLocalStatistics.java
 
b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapThreadLocalStatistics.java
new file mode 100644
index 00000000000..425a5101a42
--- /dev/null
+++ 
b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapThreadLocalStatistics.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.tez.common.counters.FileSystemCounter;
+import org.apache.tez.common.counters.TezCounters;
+import java.lang.management.ThreadMXBean;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * Convenience class for handling thread local statistics for different 
schemes in LLAP.
+ * The motivation is that thread local stats (available from 
FileSystem.getAllStatistics()) are in a List, which
+ * doesn't guarantee that for a single scheme, a single Statistics object is 
returned (e.g. in case of multiple
+ * namenodes). This class encapsulates that data, and takes care of merging 
them transparently.
+ * LlapThreadLocalStatistics is used in LLAP's task ThreadPool, where the 
current thread's statistics is calculated by a
+ * simple delta computation (before/after running the task callable), like:
+ *
+ * LlapThreadLocalStatistics statsBefore = new LlapThreadLocalStatistics(...);
+ * LlapThreadLocalStatistics diff = new 
LlapThreadLocalStatistics(...).subtract(statsBefore);
+ */
+public class LlapThreadLocalStatistics {
+
+  /**
+   * LLAP IO related counters.
+   */
+  public enum LlapExecutorCounters {
+    EXECUTOR_CPU_NS,
+    EXECUTOR_USER_NS;
+  }
+
+  @VisibleForTesting
+  Map<String, LlapFileSystemStatisticsData> schemeToThreadLocalStats = new 
HashMap<>();
+  @VisibleForTesting
+  long cpuTime;
+  @VisibleForTesting
+  long userTime;
+
+  /**
+   * In this constructor we create a snapshot of the current thread local 
statistics and take care of merging the
+   * ones that belong to the same scheme.
+   */
+  public LlapThreadLocalStatistics(ThreadMXBean mxBean) {
+    this(mxBean, FileSystem.getAllStatistics());
+  }
+
+  /**
+   * Merges the list to a map.
+   * Input list:
+   * 1. FileSystem.Statistics (scheme: file)
+   * 2. FileSystem.Statistics (scheme: hdfs)
+   * 3. FileSystem.Statistics (scheme: hdfs)
+   * Output map:
+   * file : LlapThreadLocalStatistics.StatisticsData (1)
+   * hdfs : LlapThreadLocalStatistics.StatisticsData (2 + 3)
+   */
+  public LlapThreadLocalStatistics(ThreadMXBean mxBean, 
List<FileSystem.Statistics> allStatistics) {
+    cpuTime = mxBean == null ? -1 : mxBean.getCurrentThreadCpuTime();
+    userTime = mxBean == null ? -1 : mxBean.getCurrentThreadUserTime();
+
+    for (FileSystem.Statistics statistics : allStatistics) {
+      schemeToThreadLocalStats.merge(statistics.getScheme(),
+          new LlapFileSystemStatisticsData(statistics.getThreadStatistics()),
+          (statsCurrent, statsNew) -> 
statsCurrent.merge(statistics.getThreadStatistics()));
+    }
+  }
+
+  // This method iterates on the other LlapThreadLocalStatistics's schemes, 
and subtract them from this one if it's
+  // present here too.
+  public LlapThreadLocalStatistics subtract(LlapThreadLocalStatistics other) {
+    for (Map.Entry<String, LlapFileSystemStatisticsData> otherStats :
+        other.schemeToThreadLocalStats.entrySet()){
+      schemeToThreadLocalStats.computeIfPresent(otherStats.getKey(),
+          (thisScheme, stats) -> stats.subtract(otherStats.getValue()));
+    }
+
+    cpuTime -= other.cpuTime;
+    userTime -= other.userTime;
+
+    return this;
+  }
+
+  public void fill(TezCounters tezCounters) {
+    for (Map.Entry<String, LlapFileSystemStatisticsData> threadLocalStats :
+        schemeToThreadLocalStats.entrySet()){
+      String scheme = threadLocalStats.getKey();
+      LlapFileSystemStatisticsData stats = threadLocalStats.getValue();
+      tezCounters.findCounter(scheme, 
FileSystemCounter.BYTES_READ).increment(stats.bytesRead);
+      tezCounters.findCounter(scheme, 
FileSystemCounter.BYTES_WRITTEN).increment(stats.bytesWritten);
+      tezCounters.findCounter(scheme, 
FileSystemCounter.READ_OPS).increment(stats.readOps);
+      tezCounters.findCounter(scheme, 
FileSystemCounter.LARGE_READ_OPS).increment(stats.largeReadOps);
+      tezCounters.findCounter(scheme, 
FileSystemCounter.WRITE_OPS).increment(stats.writeOps);
+    }
+
+    if (cpuTime >= 0 && userTime >= 0) {
+      
tezCounters.findCounter(LlapThreadLocalStatistics.LlapExecutorCounters.EXECUTOR_CPU_NS).increment(cpuTime);
+      
tezCounters.findCounter(LlapThreadLocalStatistics.LlapExecutorCounters.EXECUTOR_USER_NS).increment(userTime);
+    }
+  }
+
+  public String toString(){
+    return String.format("LlapThreadLocalStatistics: %s", 
schemeToThreadLocalStats.toString());
+  }
+
+  /**
+   * Convenience class over Hadoop's FileSystem.Statistics.StatisticsData.
+   * Unfortunately, neither the fields, nor the convenience methods (e.g. 
StatisticsData.add, StatisticsData.negate)
+   * are available here as they are package protected, so we cannot reuse them.
+   */
+  public static class LlapFileSystemStatisticsData {
+    long bytesRead;
+    long bytesWritten;
+    int readOps;
+    int largeReadOps;
+    int writeOps;
+
+    public LlapFileSystemStatisticsData(FileSystem.Statistics.StatisticsData 
fsStats) {
+      this.bytesRead = fsStats.getBytesRead();
+      this.bytesWritten = fsStats.getBytesWritten();
+      this.readOps = fsStats.getReadOps();
+      this.largeReadOps = fsStats.getLargeReadOps();
+      this.writeOps = fsStats.getWriteOps();
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append(" bytesRead: ").append(bytesRead);
+      sb.append(" bytesWritten: ").append(bytesWritten);
+      sb.append(" readOps: ").append(readOps);
+      sb.append(" largeReadOps: ").append(largeReadOps);
+      sb.append(" writeOps: ").append(writeOps);
+      return sb.toString();
+    }
+
+    public LlapFileSystemStatisticsData 
merge(FileSystem.Statistics.StatisticsData other) {
+      this.bytesRead += other.getBytesRead();
+      this.bytesWritten += other.getBytesWritten();
+      this.readOps += other.getReadOps();
+      this.largeReadOps += other.getLargeReadOps();
+      this.writeOps += other.getWriteOps();
+      return this;
+    }
+
+    public LlapFileSystemStatisticsData subtract(LlapFileSystemStatisticsData 
other) {
+      if (other == null){
+        return this;
+      }
+      this.bytesRead -= other.bytesRead;
+      this.bytesWritten -= other.bytesWritten;
+      this.readOps -= other.readOps;
+      this.largeReadOps -= other.largeReadOps;
+      this.writeOps -= other.writeOps;
+      return this;
+    }
+  }
+}
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java 
b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
index 006634dd725..f7cfbd77619 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
@@ -18,17 +18,12 @@ import java.lang.management.ManagementFactory;
 import java.lang.management.ThreadMXBean;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
@@ -125,109 +120,6 @@ public class LlapUtil {
     return (components == null || components.length != 3) ? principal : 
components[0];
   }
 
-  public static List<StatisticsData> getStatisticsForScheme(final String 
scheme,
-      final List<StatisticsData> stats) {
-    List<StatisticsData> result = new ArrayList<>();
-    if (stats != null && scheme != null) {
-      for (StatisticsData s : stats) {
-        if (s.getScheme().equalsIgnoreCase(scheme)) {
-          result.add(s);
-        }
-      }
-    }
-    return result;
-  }
-
-  public static Map<String, FileSystem.Statistics> 
getCombinedFileSystemStatistics() {
-    final List<FileSystem.Statistics> allStats = FileSystem.getAllStatistics();
-    final Map<String, FileSystem.Statistics> result = new HashMap<>();
-    for (FileSystem.Statistics statistics : allStats) {
-      final String scheme = statistics.getScheme();
-      if (result.containsKey(scheme)) {
-        FileSystem.Statistics existing = result.get(scheme);
-        FileSystem.Statistics combined = combineFileSystemStatistics(existing, 
statistics);
-        result.put(scheme, combined);
-      } else {
-        result.put(scheme, statistics);
-      }
-    }
-    return result;
-  }
-
-  private static FileSystem.Statistics combineFileSystemStatistics(final 
FileSystem.Statistics s1,
-      final FileSystem.Statistics s2) {
-    FileSystem.Statistics result = new FileSystem.Statistics(s1);
-    result.incrementReadOps(s2.getReadOps());
-    result.incrementLargeReadOps(s2.getLargeReadOps());
-    result.incrementWriteOps(s2.getWriteOps());
-    result.incrementBytesRead(s2.getBytesRead());
-    result.incrementBytesWritten(s2.getBytesWritten());
-    return result;
-  }
-
-  public static List<StatisticsData> cloneThreadLocalFileSystemStatistics() {
-    List<StatisticsData> result = new ArrayList<>();
-    // thread local filesystem stats is private and cannot be cloned. So make 
a copy to new class
-    for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) {
-      result.add(new StatisticsData(statistics.getScheme(), 
statistics.getThreadStatistics()));
-    }
-    return result;
-  }
-
-  public static class StatisticsData {
-    long bytesRead;
-    long bytesWritten;
-    int readOps;
-    int largeReadOps;
-    int writeOps;
-    String scheme;
-
-    public StatisticsData(String scheme, FileSystem.Statistics.StatisticsData 
fsStats) {
-      this.scheme = scheme;
-      this.bytesRead = fsStats.getBytesRead();
-      this.bytesWritten = fsStats.getBytesWritten();
-      this.readOps = fsStats.getReadOps();
-      this.largeReadOps = fsStats.getLargeReadOps();
-      this.writeOps = fsStats.getWriteOps();
-    }
-
-    public long getBytesRead() {
-      return bytesRead;
-    }
-
-    public long getBytesWritten() {
-      return bytesWritten;
-    }
-
-    public int getReadOps() {
-      return readOps;
-    }
-
-    public int getLargeReadOps() {
-      return largeReadOps;
-    }
-
-    public int getWriteOps() {
-      return writeOps;
-    }
-
-    public String getScheme() {
-      return scheme;
-    }
-
-    @Override
-    public String toString() {
-      StringBuilder sb = new StringBuilder();
-      sb.append(" scheme: ").append(scheme);
-      sb.append(" bytesRead: ").append(bytesRead);
-      sb.append(" bytesWritten: ").append(bytesWritten);
-      sb.append(" readOps: ").append(readOps);
-      sb.append(" largeReadOps: ").append(largeReadOps);
-      sb.append(" writeOps: ").append(writeOps);
-      return sb.toString();
-    }
-  }
-
   public static String getAmHostNameFromAddress(InetSocketAddress address, 
Configuration conf) {
     if (!HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_DAEMON_AM_USE_FQDN)) 
{
       return address.getHostName();
diff --git 
a/llap-common/src/test/org/apache/hadoop/hive/llap/TestLlapThreadLocalStatistics.java
 
b/llap-common/src/test/org/apache/hadoop/hive/llap/TestLlapThreadLocalStatistics.java
new file mode 100644
index 00000000000..8955ccdec4f
--- /dev/null
+++ 
b/llap-common/src/test/org/apache/hadoop/hive/llap/TestLlapThreadLocalStatistics.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.fs.FileSystem;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.lang.management.ThreadMXBean;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class TestLlapThreadLocalStatistics {
+
+  private static final ThreadMXBean mxBean = LlapUtil.initThreadMxBean();
+  private static final String FILE = "file";
+  private static final String HDFS = "hdfs";
+
+  @Test
+  public void testEmptyStatistics() {
+    LlapThreadLocalStatistics before = new LlapThreadLocalStatistics(mxBean, 
new ArrayList<>());
+    LlapThreadLocalStatistics after = new LlapThreadLocalStatistics(mxBean, 
new ArrayList<>());
+    Assert.assertEquals(0, 
after.subtract(before).schemeToThreadLocalStats.keySet().size());
+  }
+
+  @Test
+  public void testCpuTimeUserTime() throws Exception {
+    LlapThreadLocalStatistics before = new LlapThreadLocalStatistics(mxBean, 
new ArrayList<>());
+    Assert.assertTrue("cpuTime should be >0", before.cpuTime > 0);
+    Assert.assertTrue("userTime should be >0", before.userTime > 0);
+
+    Thread.sleep(100);
+
+    LlapThreadLocalStatistics after = new LlapThreadLocalStatistics(mxBean, 
new ArrayList<>());
+    Assert.assertTrue("cpuTime should increase", after.cpuTime > 
before.cpuTime);
+    // userTime assertion is flaky, not checked here
+  }
+
+  @Test
+  public void testCountersMergedForTheSameScheme() {
+    LlapThreadLocalStatistics stats = new LlapThreadLocalStatistics(mxBean,
+        createMockStatistics(new String[]{FILE, HDFS, HDFS}, new Integer[]{1, 
1, 1}));
+    Assert.assertEquals(1, stats.schemeToThreadLocalStats.get(FILE).bytesRead);
+    Assert.assertEquals(2, stats.schemeToThreadLocalStats.get(HDFS).bytesRead);
+  }
+
+  @Test
+  public void testCountersBeforeAfter() {
+    LlapThreadLocalStatistics before = new LlapThreadLocalStatistics(mxBean,
+        createMockStatistics(new String[]{FILE, HDFS, HDFS}, new Integer[]{1, 
1, 1}));
+    LlapThreadLocalStatistics after = new LlapThreadLocalStatistics(mxBean,
+        createMockStatistics(new String[]{FILE, HDFS, HDFS}, new Integer[]{3, 
1, 4}));
+
+    Assert.assertEquals(1, 
before.schemeToThreadLocalStats.get(FILE).bytesRead);
+    Assert.assertEquals(2, 
before.schemeToThreadLocalStats.get(HDFS).bytesRead);
+    Assert.assertEquals(3, after.schemeToThreadLocalStats.get(FILE).bytesRead);
+    Assert.assertEquals(5, after.schemeToThreadLocalStats.get(HDFS).bytesRead);
+
+    after.subtract(before);
+
+    // file: 3 - 1
+    Assert.assertEquals(2, after.schemeToThreadLocalStats.get(FILE).bytesRead);
+    // hdfs: (1 + 4) - (1 + 1)
+    Assert.assertEquals(3, after.schemeToThreadLocalStats.get(HDFS).bytesRead);
+  }
+
+  private List<FileSystem.Statistics> createMockStatistics(String[] schemes, 
Integer[] values) {
+    return IntStream.range(0, schemes.length).mapToObj(i -> {
+      FileSystem.Statistics stat = new FileSystem.Statistics(schemes[i]);
+      stat.incrementBytesRead(values[i]);
+      return stat;
+    }).collect(Collectors.toList());
+  }
+}
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java
index 873bdc73500..62db0588812 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java
@@ -17,12 +17,8 @@
  */
 package org.apache.hadoop.hive.llap.daemon.impl;
 
-import java.lang.management.ManagementFactory;
 import java.lang.management.ThreadMXBean;
 import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
 import java.util.Stack;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
@@ -32,9 +28,8 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.llap.LlapThreadLocalStatistics;
 import org.apache.hadoop.hive.llap.LlapUtil;
-import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
 import org.apache.hadoop.hive.llap.io.encoded.TezCounterSource;
 import org.apache.tez.common.CallableWithNdc;
 import org.apache.tez.common.counters.FileSystemCounter;
@@ -110,19 +105,12 @@ public class StatsRecordingThreadPool extends 
ThreadPoolExecutor {
       }
 
       // clone thread local file system statistics
-      List<LlapUtil.StatisticsData> statsBefore = 
LlapUtil.cloneThreadLocalFileSystemStatistics();
-      long cpuTime = mxBean == null ? -1 : mxBean.getCurrentThreadCpuTime(),
-          userTime = mxBean == null ? -1 : mxBean.getCurrentThreadUserTime();
+      LlapThreadLocalStatistics statsBefore = new 
LlapThreadLocalStatistics(mxBean);
       setupMDCFromNDC(actualCallable);
       try {
         return actualCallable.call();
       } finally {
-        if (mxBean != null) {
-          cpuTime = mxBean.getCurrentThreadCpuTime() - cpuTime;
-          userTime = mxBean.getCurrentThreadUserTime() - userTime;
-        }
-        updateCounters(statsBefore, actualCallable, cpuTime, userTime);
-
+        updateCounters(statsBefore, actualCallable);
         MDC.clear();
       }
     }
@@ -161,93 +149,23 @@ public class StatsRecordingThreadPool extends 
ThreadPoolExecutor {
       }
     }
 
-    /**
-     * LLAP IO related counters.
-     */
-    public enum LlapExecutorCounters {
-      EXECUTOR_CPU_NS,
-      EXECUTOR_USER_NS;
-
-    }
-
-    private void updateCounters(final List<LlapUtil.StatisticsData> 
statsBefore,
-        final Callable<V> actualCallable, long cpuTime, long userTime) {
+    private void updateCounters(final LlapThreadLocalStatistics statsBefore,
+        final Callable<V> actualCallable) {
       Thread thread = Thread.currentThread();
-      TezCounters tezCounters = null;
-      // add tez counters for task execution and llap io
-      if (actualCallable instanceof TaskRunner2Callable) {
-        TaskRunner2Callable taskRunner2Callable = (TaskRunner2Callable) 
actualCallable;
-        // counters for task execution side
-        tezCounters = 
taskRunner2Callable.addAndGetTezCounter(FileSystemCounter.class.getName());
-      } else if (actualCallable instanceof TezCounterSource) {
-        // Other counter sources (currently used in LLAP IO).
-        tezCounters = ((TezCounterSource) actualCallable).getTezCounters();
-      } else {
-        LOG.warn("Unexpected callable {}; cannot get counters", 
actualCallable);
-      }
+      final TezCounters tezCounters = getTezCounters(actualCallable);
 
       if (tezCounters != null) {
-        if (cpuTime >= 0 && userTime >= 0) {
-          
tezCounters.findCounter(LlapExecutorCounters.EXECUTOR_CPU_NS).increment(cpuTime);
-          
tezCounters.findCounter(LlapExecutorCounters.EXECUTOR_USER_NS).increment(userTime);
-        }
         if (statsBefore != null) {
-          // if there are multiple stats for the same scheme (from different 
NameNode), this
-          // method will squash them together
-          Map<String, FileSystem.Statistics> schemeToStats = LlapUtil
-              .getCombinedFileSystemStatistics();
-          for (Map.Entry<String, FileSystem.Statistics> entry : 
schemeToStats.entrySet()) {
-            final String scheme = entry.getKey();
-            FileSystem.Statistics statistics = entry.getValue();
-            FileSystem.Statistics.StatisticsData threadFSStats = statistics
-                .getThreadStatistics();
-            List<LlapUtil.StatisticsData> allStatsBefore = LlapUtil
-                .getStatisticsForScheme(scheme, statsBefore);
-            long bytesReadDelta = 0;
-            long bytesWrittenDelta = 0;
-            long readOpsDelta = 0;
-            long largeReadOpsDelta = 0;
-            long writeOpsDelta = 0;
-            // there could be more scheme after execution as execution might 
be accessing a
-            // different filesystem. So if we don't find a matching scheme 
before execution we
-            // just use the after execution values directly without computing 
delta difference
-            if (allStatsBefore != null && !allStatsBefore.isEmpty()) {
-              for (LlapUtil.StatisticsData sb : allStatsBefore) {
-                bytesReadDelta += threadFSStats.getBytesRead() - 
sb.getBytesRead();
-                bytesWrittenDelta += threadFSStats.getBytesWritten() - 
sb.getBytesWritten();
-                readOpsDelta += threadFSStats.getReadOps() - sb.getReadOps();
-                largeReadOpsDelta += threadFSStats.getLargeReadOps() - 
sb.getLargeReadOps();
-                writeOpsDelta += threadFSStats.getWriteOps() - 
sb.getWriteOps();
-              }
-            } else {
-              bytesReadDelta = threadFSStats.getBytesRead();
-              bytesWrittenDelta = threadFSStats.getBytesWritten();
-              readOpsDelta = threadFSStats.getReadOps();
-              largeReadOpsDelta = threadFSStats.getLargeReadOps();
-              writeOpsDelta = threadFSStats.getWriteOps();
-            }
-            tezCounters.findCounter(scheme, FileSystemCounter.BYTES_READ)
-                .increment(bytesReadDelta);
-            tezCounters.findCounter(scheme, FileSystemCounter.BYTES_WRITTEN)
-                .increment(bytesWrittenDelta);
-            tezCounters.findCounter(scheme, 
FileSystemCounter.READ_OPS).increment(readOpsDelta);
-            tezCounters.findCounter(scheme, FileSystemCounter.LARGE_READ_OPS)
-                .increment(largeReadOpsDelta);
-            tezCounters.findCounter(scheme, FileSystemCounter.WRITE_OPS)
-                .increment(writeOpsDelta);
+          LlapThreadLocalStatistics currentStats = new 
LlapThreadLocalStatistics(mxBean);
+          currentStats.subtract(statsBefore).fill(tezCounters);
 
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Updated stats: instance: {} thread name: {} thread 
id: {} scheme: {} " +
-                      "bytesRead: {} bytesWritten: {} readOps: {} 
largeReadOps: {} writeOps: {}",
-                  actualCallable.getClass().getSimpleName(), thread.getName(), 
thread.getId(),
-                  scheme, bytesReadDelta, bytesWrittenDelta, readOpsDelta, 
largeReadOpsDelta,
-                  writeOpsDelta);
-            }
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Updated stats: instance: {} thread name: {} thread id: 
{} stats: {}",
+                actualCallable.getClass().getSimpleName(), thread.getName(), 
thread.getId(), currentStats);
           }
         } else {
           LOG.warn("File system statistics snapshot before execution of thread 
is null." +
-                  "Thread name: {} id: {} allStats: {}", thread.getName(), 
thread.getId(),
-              statsBefore);
+                  "Thread name: {} id: {}", thread.getName(), thread.getId());
         }
       } else {
         LOG.warn("TezCounters is null for callable type: {}",
@@ -255,4 +173,20 @@ public class StatsRecordingThreadPool extends 
ThreadPoolExecutor {
       }
     }
   }
+
+  private static <V> TezCounters getTezCounters(Callable<V> actualCallable) {
+    TezCounters tezCounters = null;
+    // add tez counters for task execution and llap io
+    if (actualCallable instanceof TaskRunner2Callable) {
+      TaskRunner2Callable taskRunner2Callable = (TaskRunner2Callable) 
actualCallable;
+      // counters for task execution side
+      tezCounters = 
taskRunner2Callable.addAndGetTezCounter(FileSystemCounter.class.getName());
+    } else if (actualCallable instanceof TezCounterSource) {
+      // Other counter sources (currently used in LLAP IO).
+      tezCounters = ((TezCounterSource) actualCallable).getTezCounters();
+    } else {
+      LOG.warn("Unexpected callable {}; cannot get counters", actualCallable);
+    }
+    return tezCounters;
+  }
 }

Reply via email to