This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new cf83d475127 HIVE-28638: Refactor stats handling in
StatsRecordingThreadPool (#5557) (Laszlo Bodor reviewed by Dmitriy Fingerman)
cf83d475127 is described below
commit cf83d4751271622a9a700c6f2330dbfff38801d2
Author: Bodor Laszlo <[email protected]>
AuthorDate: Thu Dec 5 16:39:44 2024 +0100
HIVE-28638: Refactor stats handling in StatsRecordingThreadPool (#5557)
(Laszlo Bodor reviewed by Dmitriy Fingerman)
---
.../hive/llap/LlapThreadLocalStatistics.java | 176 +++++++++++++++++++++
.../java/org/apache/hadoop/hive/llap/LlapUtil.java | 108 -------------
.../hive/llap/TestLlapThreadLocalStatistics.java | 94 +++++++++++
.../llap/daemon/impl/StatsRecordingThreadPool.java | 122 ++++----------
4 files changed, 298 insertions(+), 202 deletions(-)
diff --git
a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapThreadLocalStatistics.java
b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapThreadLocalStatistics.java
new file mode 100644
index 00000000000..425a5101a42
--- /dev/null
+++
b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapThreadLocalStatistics.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.tez.common.counters.FileSystemCounter;
+import org.apache.tez.common.counters.TezCounters;
+import java.lang.management.ThreadMXBean;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * Convenience class for handling thread local statistics for different
schemes in LLAP.
+ * The motivation is that thread local stats (available from
FileSystem.getAllStatistics()) are in a List, which
+ * doesn't guarantee that for a single scheme, a single Statistics object is
returned (e.g. in case of multiple
+ * namenodes). This class encapsulates that data, and takes care of merging
them transparently.
+ * LlapThreadLocalStatistics is used in LLAP's task ThreadPool, where the
current thread's statistics is calculated by a
+ * simple delta computation (before/after running the task callable), like:
+ *
+ * LlapThreadLocalStatistics statsBefore = new LlapThreadLocalStatistics(...);
+ * LlapThreadLocalStatistics diff = new
LlapThreadLocalStatistics(...).subtract(statsBefore);
+ */
+public class LlapThreadLocalStatistics {
+
+ /**
+ * LLAP IO related counters.
+ */
+ public enum LlapExecutorCounters {
+ EXECUTOR_CPU_NS,
+ EXECUTOR_USER_NS;
+ }
+
+ @VisibleForTesting
+ Map<String, LlapFileSystemStatisticsData> schemeToThreadLocalStats = new
HashMap<>();
+ @VisibleForTesting
+ long cpuTime;
+ @VisibleForTesting
+ long userTime;
+
+ /**
+ * In this constructor we create a snapshot of the current thread local
statistics and take care of merging the
+ * ones that belong to the same scheme.
+ */
+ public LlapThreadLocalStatistics(ThreadMXBean mxBean) {
+ this(mxBean, FileSystem.getAllStatistics());
+ }
+
+ /**
+ * Merges the list to a map.
+ * Input list:
+ * 1. FileSystem.Statistics (scheme: file)
+ * 2. FileSystem.Statistics (scheme: hdfs)
+ * 3. FileSystem.Statistics (scheme: hdfs)
+ * Output map:
+ * file : LlapThreadLocalStatistics.StatisticsData (1)
+ * hdfs : LlapThreadLocalStatistics.StatisticsData (2 + 3)
+ */
+ public LlapThreadLocalStatistics(ThreadMXBean mxBean,
List<FileSystem.Statistics> allStatistics) {
+ cpuTime = mxBean == null ? -1 : mxBean.getCurrentThreadCpuTime();
+ userTime = mxBean == null ? -1 : mxBean.getCurrentThreadUserTime();
+
+ for (FileSystem.Statistics statistics : allStatistics) {
+ schemeToThreadLocalStats.merge(statistics.getScheme(),
+ new LlapFileSystemStatisticsData(statistics.getThreadStatistics()),
+ (statsCurrent, statsNew) ->
statsCurrent.merge(statistics.getThreadStatistics()));
+ }
+ }
+
+ // This method iterates on the other LlapThreadLocalStatistics's schemes,
and subtract them from this one if it's
+ // present here too.
+ public LlapThreadLocalStatistics subtract(LlapThreadLocalStatistics other) {
+ for (Map.Entry<String, LlapFileSystemStatisticsData> otherStats :
+ other.schemeToThreadLocalStats.entrySet()){
+ schemeToThreadLocalStats.computeIfPresent(otherStats.getKey(),
+ (thisScheme, stats) -> stats.subtract(otherStats.getValue()));
+ }
+
+ cpuTime -= other.cpuTime;
+ userTime -= other.userTime;
+
+ return this;
+ }
+
+ public void fill(TezCounters tezCounters) {
+ for (Map.Entry<String, LlapFileSystemStatisticsData> threadLocalStats :
+ schemeToThreadLocalStats.entrySet()){
+ String scheme = threadLocalStats.getKey();
+ LlapFileSystemStatisticsData stats = threadLocalStats.getValue();
+ tezCounters.findCounter(scheme,
FileSystemCounter.BYTES_READ).increment(stats.bytesRead);
+ tezCounters.findCounter(scheme,
FileSystemCounter.BYTES_WRITTEN).increment(stats.bytesWritten);
+ tezCounters.findCounter(scheme,
FileSystemCounter.READ_OPS).increment(stats.readOps);
+ tezCounters.findCounter(scheme,
FileSystemCounter.LARGE_READ_OPS).increment(stats.largeReadOps);
+ tezCounters.findCounter(scheme,
FileSystemCounter.WRITE_OPS).increment(stats.writeOps);
+ }
+
+ if (cpuTime >= 0 && userTime >= 0) {
+
tezCounters.findCounter(LlapThreadLocalStatistics.LlapExecutorCounters.EXECUTOR_CPU_NS).increment(cpuTime);
+
tezCounters.findCounter(LlapThreadLocalStatistics.LlapExecutorCounters.EXECUTOR_USER_NS).increment(userTime);
+ }
+ }
+
+ public String toString(){
+ return String.format("LlapThreadLocalStatistics: %s",
schemeToThreadLocalStats.toString());
+ }
+
+ /**
+ * Convenience class over Hadoop's FileSystem.Statistics.StatisticsData.
+ * Unfortunately, neither the fields, nor the convenience methods (e.g.
StatisticsData.add, StatisticsData.negate)
+ * are available here as they are package protected, so we cannot reuse them.
+ */
+ public static class LlapFileSystemStatisticsData {
+ long bytesRead;
+ long bytesWritten;
+ int readOps;
+ int largeReadOps;
+ int writeOps;
+
+ public LlapFileSystemStatisticsData(FileSystem.Statistics.StatisticsData
fsStats) {
+ this.bytesRead = fsStats.getBytesRead();
+ this.bytesWritten = fsStats.getBytesWritten();
+ this.readOps = fsStats.getReadOps();
+ this.largeReadOps = fsStats.getLargeReadOps();
+ this.writeOps = fsStats.getWriteOps();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(" bytesRead: ").append(bytesRead);
+ sb.append(" bytesWritten: ").append(bytesWritten);
+ sb.append(" readOps: ").append(readOps);
+ sb.append(" largeReadOps: ").append(largeReadOps);
+ sb.append(" writeOps: ").append(writeOps);
+ return sb.toString();
+ }
+
+ public LlapFileSystemStatisticsData
merge(FileSystem.Statistics.StatisticsData other) {
+ this.bytesRead += other.getBytesRead();
+ this.bytesWritten += other.getBytesWritten();
+ this.readOps += other.getReadOps();
+ this.largeReadOps += other.getLargeReadOps();
+ this.writeOps += other.getWriteOps();
+ return this;
+ }
+
+ public LlapFileSystemStatisticsData subtract(LlapFileSystemStatisticsData
other) {
+ if (other == null){
+ return this;
+ }
+ this.bytesRead -= other.bytesRead;
+ this.bytesWritten -= other.bytesWritten;
+ this.readOps -= other.readOps;
+ this.largeReadOps -= other.largeReadOps;
+ this.writeOps -= other.writeOps;
+ return this;
+ }
+ }
+}
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
index 006634dd725..f7cfbd77619 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
@@ -18,17 +18,12 @@ import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
@@ -125,109 +120,6 @@ public class LlapUtil {
return (components == null || components.length != 3) ? principal :
components[0];
}
- public static List<StatisticsData> getStatisticsForScheme(final String
scheme,
- final List<StatisticsData> stats) {
- List<StatisticsData> result = new ArrayList<>();
- if (stats != null && scheme != null) {
- for (StatisticsData s : stats) {
- if (s.getScheme().equalsIgnoreCase(scheme)) {
- result.add(s);
- }
- }
- }
- return result;
- }
-
- public static Map<String, FileSystem.Statistics>
getCombinedFileSystemStatistics() {
- final List<FileSystem.Statistics> allStats = FileSystem.getAllStatistics();
- final Map<String, FileSystem.Statistics> result = new HashMap<>();
- for (FileSystem.Statistics statistics : allStats) {
- final String scheme = statistics.getScheme();
- if (result.containsKey(scheme)) {
- FileSystem.Statistics existing = result.get(scheme);
- FileSystem.Statistics combined = combineFileSystemStatistics(existing,
statistics);
- result.put(scheme, combined);
- } else {
- result.put(scheme, statistics);
- }
- }
- return result;
- }
-
- private static FileSystem.Statistics combineFileSystemStatistics(final
FileSystem.Statistics s1,
- final FileSystem.Statistics s2) {
- FileSystem.Statistics result = new FileSystem.Statistics(s1);
- result.incrementReadOps(s2.getReadOps());
- result.incrementLargeReadOps(s2.getLargeReadOps());
- result.incrementWriteOps(s2.getWriteOps());
- result.incrementBytesRead(s2.getBytesRead());
- result.incrementBytesWritten(s2.getBytesWritten());
- return result;
- }
-
- public static List<StatisticsData> cloneThreadLocalFileSystemStatistics() {
- List<StatisticsData> result = new ArrayList<>();
- // thread local filesystem stats is private and cannot be cloned. So make
a copy to new class
- for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) {
- result.add(new StatisticsData(statistics.getScheme(),
statistics.getThreadStatistics()));
- }
- return result;
- }
-
- public static class StatisticsData {
- long bytesRead;
- long bytesWritten;
- int readOps;
- int largeReadOps;
- int writeOps;
- String scheme;
-
- public StatisticsData(String scheme, FileSystem.Statistics.StatisticsData
fsStats) {
- this.scheme = scheme;
- this.bytesRead = fsStats.getBytesRead();
- this.bytesWritten = fsStats.getBytesWritten();
- this.readOps = fsStats.getReadOps();
- this.largeReadOps = fsStats.getLargeReadOps();
- this.writeOps = fsStats.getWriteOps();
- }
-
- public long getBytesRead() {
- return bytesRead;
- }
-
- public long getBytesWritten() {
- return bytesWritten;
- }
-
- public int getReadOps() {
- return readOps;
- }
-
- public int getLargeReadOps() {
- return largeReadOps;
- }
-
- public int getWriteOps() {
- return writeOps;
- }
-
- public String getScheme() {
- return scheme;
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append(" scheme: ").append(scheme);
- sb.append(" bytesRead: ").append(bytesRead);
- sb.append(" bytesWritten: ").append(bytesWritten);
- sb.append(" readOps: ").append(readOps);
- sb.append(" largeReadOps: ").append(largeReadOps);
- sb.append(" writeOps: ").append(writeOps);
- return sb.toString();
- }
- }
-
public static String getAmHostNameFromAddress(InetSocketAddress address,
Configuration conf) {
if (!HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_DAEMON_AM_USE_FQDN))
{
return address.getHostName();
diff --git
a/llap-common/src/test/org/apache/hadoop/hive/llap/TestLlapThreadLocalStatistics.java
b/llap-common/src/test/org/apache/hadoop/hive/llap/TestLlapThreadLocalStatistics.java
new file mode 100644
index 00000000000..8955ccdec4f
--- /dev/null
+++
b/llap-common/src/test/org/apache/hadoop/hive/llap/TestLlapThreadLocalStatistics.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.fs.FileSystem;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.lang.management.ThreadMXBean;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class TestLlapThreadLocalStatistics {
+
+ private static final ThreadMXBean mxBean = LlapUtil.initThreadMxBean();
+ private static final String FILE = "file";
+ private static final String HDFS = "hdfs";
+
+ @Test
+ public void testEmptyStatistics() {
+ LlapThreadLocalStatistics before = new LlapThreadLocalStatistics(mxBean,
new ArrayList<>());
+ LlapThreadLocalStatistics after = new LlapThreadLocalStatistics(mxBean,
new ArrayList<>());
+ Assert.assertEquals(0,
after.subtract(before).schemeToThreadLocalStats.keySet().size());
+ }
+
+ @Test
+ public void testCpuTimeUserTime() throws Exception {
+ LlapThreadLocalStatistics before = new LlapThreadLocalStatistics(mxBean,
new ArrayList<>());
+ Assert.assertTrue("cpuTime should be >0", before.cpuTime > 0);
+ Assert.assertTrue("userTime should be >0", before.userTime > 0);
+
+ Thread.sleep(100);
+
+ LlapThreadLocalStatistics after = new LlapThreadLocalStatistics(mxBean,
new ArrayList<>());
+ Assert.assertTrue("cpuTime should increase", after.cpuTime >
before.cpuTime);
+ // userTime assertion is flaky, not checked here
+ }
+
+ @Test
+ public void testCountersMergedForTheSameScheme() {
+ LlapThreadLocalStatistics stats = new LlapThreadLocalStatistics(mxBean,
+ createMockStatistics(new String[]{FILE, HDFS, HDFS}, new Integer[]{1,
1, 1}));
+ Assert.assertEquals(1, stats.schemeToThreadLocalStats.get(FILE).bytesRead);
+ Assert.assertEquals(2, stats.schemeToThreadLocalStats.get(HDFS).bytesRead);
+ }
+
+ @Test
+ public void testCountersBeforeAfter() {
+ LlapThreadLocalStatistics before = new LlapThreadLocalStatistics(mxBean,
+ createMockStatistics(new String[]{FILE, HDFS, HDFS}, new Integer[]{1,
1, 1}));
+ LlapThreadLocalStatistics after = new LlapThreadLocalStatistics(mxBean,
+ createMockStatistics(new String[]{FILE, HDFS, HDFS}, new Integer[]{3,
1, 4}));
+
+ Assert.assertEquals(1,
before.schemeToThreadLocalStats.get(FILE).bytesRead);
+ Assert.assertEquals(2,
before.schemeToThreadLocalStats.get(HDFS).bytesRead);
+ Assert.assertEquals(3, after.schemeToThreadLocalStats.get(FILE).bytesRead);
+ Assert.assertEquals(5, after.schemeToThreadLocalStats.get(HDFS).bytesRead);
+
+ after.subtract(before);
+
+ // file: 3 - 1
+ Assert.assertEquals(2, after.schemeToThreadLocalStats.get(FILE).bytesRead);
+ // hdfs: (1 + 4) - (1 + 1)
+ Assert.assertEquals(3, after.schemeToThreadLocalStats.get(HDFS).bytesRead);
+ }
+
+ private List<FileSystem.Statistics> createMockStatistics(String[] schemes,
Integer[] values) {
+ return IntStream.range(0, schemes.length).mapToObj(i -> {
+ FileSystem.Statistics stat = new FileSystem.Statistics(schemes[i]);
+ stat.incrementBytesRead(values[i]);
+ return stat;
+ }).collect(Collectors.toList());
+ }
+}
diff --git
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java
index 873bdc73500..62db0588812 100644
---
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java
+++
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java
@@ -17,12 +17,8 @@
*/
package org.apache.hadoop.hive.llap.daemon.impl;
-import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
import java.util.Stack;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
@@ -32,9 +28,8 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.llap.LlapThreadLocalStatistics;
import org.apache.hadoop.hive.llap.LlapUtil;
-import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
import org.apache.hadoop.hive.llap.io.encoded.TezCounterSource;
import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.common.counters.FileSystemCounter;
@@ -110,19 +105,12 @@ public class StatsRecordingThreadPool extends
ThreadPoolExecutor {
}
// clone thread local file system statistics
- List<LlapUtil.StatisticsData> statsBefore =
LlapUtil.cloneThreadLocalFileSystemStatistics();
- long cpuTime = mxBean == null ? -1 : mxBean.getCurrentThreadCpuTime(),
- userTime = mxBean == null ? -1 : mxBean.getCurrentThreadUserTime();
+ LlapThreadLocalStatistics statsBefore = new
LlapThreadLocalStatistics(mxBean);
setupMDCFromNDC(actualCallable);
try {
return actualCallable.call();
} finally {
- if (mxBean != null) {
- cpuTime = mxBean.getCurrentThreadCpuTime() - cpuTime;
- userTime = mxBean.getCurrentThreadUserTime() - userTime;
- }
- updateCounters(statsBefore, actualCallable, cpuTime, userTime);
-
+ updateCounters(statsBefore, actualCallable);
MDC.clear();
}
}
@@ -161,93 +149,23 @@ public class StatsRecordingThreadPool extends
ThreadPoolExecutor {
}
}
- /**
- * LLAP IO related counters.
- */
- public enum LlapExecutorCounters {
- EXECUTOR_CPU_NS,
- EXECUTOR_USER_NS;
-
- }
-
- private void updateCounters(final List<LlapUtil.StatisticsData>
statsBefore,
- final Callable<V> actualCallable, long cpuTime, long userTime) {
+ private void updateCounters(final LlapThreadLocalStatistics statsBefore,
+ final Callable<V> actualCallable) {
Thread thread = Thread.currentThread();
- TezCounters tezCounters = null;
- // add tez counters for task execution and llap io
- if (actualCallable instanceof TaskRunner2Callable) {
- TaskRunner2Callable taskRunner2Callable = (TaskRunner2Callable)
actualCallable;
- // counters for task execution side
- tezCounters =
taskRunner2Callable.addAndGetTezCounter(FileSystemCounter.class.getName());
- } else if (actualCallable instanceof TezCounterSource) {
- // Other counter sources (currently used in LLAP IO).
- tezCounters = ((TezCounterSource) actualCallable).getTezCounters();
- } else {
- LOG.warn("Unexpected callable {}; cannot get counters",
actualCallable);
- }
+ final TezCounters tezCounters = getTezCounters(actualCallable);
if (tezCounters != null) {
- if (cpuTime >= 0 && userTime >= 0) {
-
tezCounters.findCounter(LlapExecutorCounters.EXECUTOR_CPU_NS).increment(cpuTime);
-
tezCounters.findCounter(LlapExecutorCounters.EXECUTOR_USER_NS).increment(userTime);
- }
if (statsBefore != null) {
- // if there are multiple stats for the same scheme (from different
NameNode), this
- // method will squash them together
- Map<String, FileSystem.Statistics> schemeToStats = LlapUtil
- .getCombinedFileSystemStatistics();
- for (Map.Entry<String, FileSystem.Statistics> entry :
schemeToStats.entrySet()) {
- final String scheme = entry.getKey();
- FileSystem.Statistics statistics = entry.getValue();
- FileSystem.Statistics.StatisticsData threadFSStats = statistics
- .getThreadStatistics();
- List<LlapUtil.StatisticsData> allStatsBefore = LlapUtil
- .getStatisticsForScheme(scheme, statsBefore);
- long bytesReadDelta = 0;
- long bytesWrittenDelta = 0;
- long readOpsDelta = 0;
- long largeReadOpsDelta = 0;
- long writeOpsDelta = 0;
- // there could be more scheme after execution as execution might
be accessing a
- // different filesystem. So if we don't find a matching scheme
before execution we
- // just use the after execution values directly without computing
delta difference
- if (allStatsBefore != null && !allStatsBefore.isEmpty()) {
- for (LlapUtil.StatisticsData sb : allStatsBefore) {
- bytesReadDelta += threadFSStats.getBytesRead() -
sb.getBytesRead();
- bytesWrittenDelta += threadFSStats.getBytesWritten() -
sb.getBytesWritten();
- readOpsDelta += threadFSStats.getReadOps() - sb.getReadOps();
- largeReadOpsDelta += threadFSStats.getLargeReadOps() -
sb.getLargeReadOps();
- writeOpsDelta += threadFSStats.getWriteOps() -
sb.getWriteOps();
- }
- } else {
- bytesReadDelta = threadFSStats.getBytesRead();
- bytesWrittenDelta = threadFSStats.getBytesWritten();
- readOpsDelta = threadFSStats.getReadOps();
- largeReadOpsDelta = threadFSStats.getLargeReadOps();
- writeOpsDelta = threadFSStats.getWriteOps();
- }
- tezCounters.findCounter(scheme, FileSystemCounter.BYTES_READ)
- .increment(bytesReadDelta);
- tezCounters.findCounter(scheme, FileSystemCounter.BYTES_WRITTEN)
- .increment(bytesWrittenDelta);
- tezCounters.findCounter(scheme,
FileSystemCounter.READ_OPS).increment(readOpsDelta);
- tezCounters.findCounter(scheme, FileSystemCounter.LARGE_READ_OPS)
- .increment(largeReadOpsDelta);
- tezCounters.findCounter(scheme, FileSystemCounter.WRITE_OPS)
- .increment(writeOpsDelta);
+ LlapThreadLocalStatistics currentStats = new
LlapThreadLocalStatistics(mxBean);
+ currentStats.subtract(statsBefore).fill(tezCounters);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Updated stats: instance: {} thread name: {} thread
id: {} scheme: {} " +
- "bytesRead: {} bytesWritten: {} readOps: {}
largeReadOps: {} writeOps: {}",
- actualCallable.getClass().getSimpleName(), thread.getName(),
thread.getId(),
- scheme, bytesReadDelta, bytesWrittenDelta, readOpsDelta,
largeReadOpsDelta,
- writeOpsDelta);
- }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Updated stats: instance: {} thread name: {} thread id:
{} stats: {}",
+ actualCallable.getClass().getSimpleName(), thread.getName(),
thread.getId(), currentStats);
}
} else {
LOG.warn("File system statistics snapshot before execution of thread
is null." +
- "Thread name: {} id: {} allStats: {}", thread.getName(),
thread.getId(),
- statsBefore);
+ "Thread name: {} id: {}", thread.getName(), thread.getId());
}
} else {
LOG.warn("TezCounters is null for callable type: {}",
@@ -255,4 +173,20 @@ public class StatsRecordingThreadPool extends
ThreadPoolExecutor {
}
}
}
+
+ private static <V> TezCounters getTezCounters(Callable<V> actualCallable) {
+ TezCounters tezCounters = null;
+ // add tez counters for task execution and llap io
+ if (actualCallable instanceof TaskRunner2Callable) {
+ TaskRunner2Callable taskRunner2Callable = (TaskRunner2Callable)
actualCallable;
+ // counters for task execution side
+ tezCounters =
taskRunner2Callable.addAndGetTezCounter(FileSystemCounter.class.getName());
+ } else if (actualCallable instanceof TezCounterSource) {
+ // Other counter sources (currently used in LLAP IO).
+ tezCounters = ((TezCounterSource) actualCallable).getTezCounters();
+ } else {
+ LOG.warn("Unexpected callable {}; cannot get counters", actualCallable);
+ }
+ return tezCounters;
+ }
}