deniskuzZ commented on a change in pull request #2563:
URL: https://github.com/apache/hive/pull/2563#discussion_r685006096
##########
File path:
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -136,67 +133,77 @@ public static synchronized void init(HiveConf conf)
throws Exception {
private void configure(HiveConf conf) throws Exception {
acidMetricsExtEnabled = MetastoreConf.getBoolVar(conf,
MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON);
+ if (acidMetricsExtEnabled) {
- deltasThreshold = HiveConf.getIntVar(conf,
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_DELTA_NUM_THRESHOLD);
- obsoleteDeltasThreshold = HiveConf.getIntVar(conf,
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
-
- initCachesForMetrics(conf);
- initObjectsForMetrics();
+ initCachesForMetrics(conf);
+ initObjectsForMetrics();
- long reportingInterval = HiveConf.getTimeVar(conf,
- HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL,
TimeUnit.SECONDS);
+ long reportingInterval =
+ HiveConf.getTimeVar(conf,
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL, TimeUnit.SECONDS);
- ThreadFactory threadFactory =
- new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat("DeltaFilesMetricReporter %d")
- .build();
- executorService =
Executors.newSingleThreadScheduledExecutor(threadFactory);
- executorService.scheduleAtFixedRate(
- new ReportingTask(), 0, reportingInterval, TimeUnit.SECONDS);
+ ThreadFactory threadFactory =
+ new
ThreadFactoryBuilder().setDaemon(true).setNameFormat("DeltaFilesMetricReporter
%d").build();
+ executorService =
Executors.newSingleThreadScheduledExecutor(threadFactory);
+ executorService.scheduleAtFixedRate(new ReportingTask(), 0,
reportingInterval, TimeUnit.SECONDS);
- LOG.info("Started DeltaFilesMetricReporter thread");
+ LOG.info("Started DeltaFilesMetricReporter thread");
+ }
}
public void submit(TezCounters counters) {
if (acidMetricsExtEnabled) {
- updateMetrics(NUM_OBSOLETE_DELTAS,
- obsoleteDeltaCache, obsoleteDeltaTopN, obsoleteDeltasThreshold,
- counters);
- updateMetrics(NUM_DELTAS,
- deltaCache, deltaTopN, deltasThreshold,
- counters);
- updateMetrics(NUM_SMALL_DELTAS,
- smallDeltaCache, smallDeltaTopN, deltasThreshold,
- counters);
+ updateMetrics(NUM_OBSOLETE_DELTAS, obsoleteDeltaCache,
obsoleteDeltaTopN, counters);
+ updateMetrics(NUM_DELTAS, deltaCache, deltaTopN, counters);
+ updateMetrics(NUM_SMALL_DELTAS, smallDeltaCache, smallDeltaTopN,
counters);
}
}
+ /**
+ * Copy counters to caches.
+ */
private void updateMetrics(DeltaFilesMetricType metric, Cache<String,
Integer> cache, Queue<Pair<String, Integer>> topN,
- int threshold, TezCounters counters) {
- counters.getGroup(metric.value).forEach(counter -> {
- Integer prev = cache.getIfPresent(counter.getName());
- if (prev != null && prev != counter.getValue()) {
- cache.invalidate(counter.getName());
+ TezCounters counters) {
+ try {
+ CounterGroup group = counters.getGroup(metric.value);
+ // if the group is empty, clear the cache
+ if (group.size() == 0) {
+ cache.invalidateAll();
Review comment:
Are you cleaning the whole cache when tez task doesn't have any deltas
to report? What about data from other tasks that touched diff
tables/partitions.
##########
File path:
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -136,67 +133,77 @@ public static synchronized void init(HiveConf conf)
throws Exception {
private void configure(HiveConf conf) throws Exception {
acidMetricsExtEnabled = MetastoreConf.getBoolVar(conf,
MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON);
+ if (acidMetricsExtEnabled) {
Review comment:
should we even create an instance of DeltaFilesMetricReporter when
MetastoreConf.ConfVars.METRICS_ENABLED || METASTORE_ACIDMETRICS_EXT_ON are set
to false?
##########
File path:
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -136,67 +133,77 @@ public static synchronized void init(HiveConf conf)
throws Exception {
private void configure(HiveConf conf) throws Exception {
acidMetricsExtEnabled = MetastoreConf.getBoolVar(conf,
MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON);
+ if (acidMetricsExtEnabled) {
- deltasThreshold = HiveConf.getIntVar(conf,
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_DELTA_NUM_THRESHOLD);
- obsoleteDeltasThreshold = HiveConf.getIntVar(conf,
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
-
- initCachesForMetrics(conf);
- initObjectsForMetrics();
+ initCachesForMetrics(conf);
+ initObjectsForMetrics();
- long reportingInterval = HiveConf.getTimeVar(conf,
- HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL,
TimeUnit.SECONDS);
+ long reportingInterval =
+ HiveConf.getTimeVar(conf,
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL, TimeUnit.SECONDS);
- ThreadFactory threadFactory =
- new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat("DeltaFilesMetricReporter %d")
- .build();
- executorService =
Executors.newSingleThreadScheduledExecutor(threadFactory);
- executorService.scheduleAtFixedRate(
- new ReportingTask(), 0, reportingInterval, TimeUnit.SECONDS);
+ ThreadFactory threadFactory =
+ new
ThreadFactoryBuilder().setDaemon(true).setNameFormat("DeltaFilesMetricReporter
%d").build();
+ executorService =
Executors.newSingleThreadScheduledExecutor(threadFactory);
+ executorService.scheduleAtFixedRate(new ReportingTask(), 0,
reportingInterval, TimeUnit.SECONDS);
- LOG.info("Started DeltaFilesMetricReporter thread");
+ LOG.info("Started DeltaFilesMetricReporter thread");
+ }
}
public void submit(TezCounters counters) {
if (acidMetricsExtEnabled) {
- updateMetrics(NUM_OBSOLETE_DELTAS,
- obsoleteDeltaCache, obsoleteDeltaTopN, obsoleteDeltasThreshold,
- counters);
- updateMetrics(NUM_DELTAS,
- deltaCache, deltaTopN, deltasThreshold,
- counters);
- updateMetrics(NUM_SMALL_DELTAS,
- smallDeltaCache, smallDeltaTopN, deltasThreshold,
- counters);
+ updateMetrics(NUM_OBSOLETE_DELTAS, obsoleteDeltaCache,
obsoleteDeltaTopN, counters);
+ updateMetrics(NUM_DELTAS, deltaCache, deltaTopN, counters);
+ updateMetrics(NUM_SMALL_DELTAS, smallDeltaCache, smallDeltaTopN,
counters);
}
}
+ /**
+ * Copy counters to caches.
+ */
private void updateMetrics(DeltaFilesMetricType metric, Cache<String,
Integer> cache, Queue<Pair<String, Integer>> topN,
- int threshold, TezCounters counters) {
- counters.getGroup(metric.value).forEach(counter -> {
- Integer prev = cache.getIfPresent(counter.getName());
- if (prev != null && prev != counter.getValue()) {
- cache.invalidate(counter.getName());
+ TezCounters counters) {
+ try {
+ CounterGroup group = counters.getGroup(metric.value);
+ // if the group is empty, clear the cache
+ if (group.size() == 0) {
+ cache.invalidateAll();
+ } else {
+ // if there is no counter corresponding to a cache entry, remove from
cache
+ ConcurrentMap<String, Integer> cacheMap = cache.asMap();
+ cacheMap.keySet().stream().filter(key ->
counters.findCounter(group.getName(), key).getValue() == 0)
+ .forEach(cache::invalidate);
}
- if (counter.getValue() > threshold) {
- if (topN.size() == maxCacheSize) {
- Pair<String, Integer> lowest = topN.peek();
- if (lowest != null && counter.getValue() > lowest.getValue()) {
- cache.invalidate(lowest.getKey());
- }
- }
- if (topN.size() < maxCacheSize) {
- topN.add(Pair.of(counter.getName(), (int) counter.getValue()));
- cache.put(counter.getName(), (int) counter.getValue());
+ // update existing cache entries or add new entries
+ for (TezCounter counter : group) {
+ Integer prev = cache.getIfPresent(counter.getName());
+ if (prev != null && prev != counter.getValue()) {
+ cache.invalidate(counter.getName());
}
+ topN.add(Pair.of(counter.getName(), (int) counter.getValue()));
Review comment:
Would it work as topN cache? It's gonna evict entry when the limit is
exceeded, even if that entry has a higher value than inserted
(least-recently-used eviction)
##########
File path:
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -136,67 +133,77 @@ public static synchronized void init(HiveConf conf)
throws Exception {
private void configure(HiveConf conf) throws Exception {
acidMetricsExtEnabled = MetastoreConf.getBoolVar(conf,
MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON);
+ if (acidMetricsExtEnabled) {
- deltasThreshold = HiveConf.getIntVar(conf,
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_DELTA_NUM_THRESHOLD);
- obsoleteDeltasThreshold = HiveConf.getIntVar(conf,
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
-
- initCachesForMetrics(conf);
- initObjectsForMetrics();
+ initCachesForMetrics(conf);
+ initObjectsForMetrics();
- long reportingInterval = HiveConf.getTimeVar(conf,
- HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL,
TimeUnit.SECONDS);
+ long reportingInterval =
+ HiveConf.getTimeVar(conf,
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL, TimeUnit.SECONDS);
- ThreadFactory threadFactory =
- new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat("DeltaFilesMetricReporter %d")
- .build();
- executorService =
Executors.newSingleThreadScheduledExecutor(threadFactory);
- executorService.scheduleAtFixedRate(
- new ReportingTask(), 0, reportingInterval, TimeUnit.SECONDS);
+ ThreadFactory threadFactory =
+ new
ThreadFactoryBuilder().setDaemon(true).setNameFormat("DeltaFilesMetricReporter
%d").build();
+ executorService =
Executors.newSingleThreadScheduledExecutor(threadFactory);
+ executorService.scheduleAtFixedRate(new ReportingTask(), 0,
reportingInterval, TimeUnit.SECONDS);
- LOG.info("Started DeltaFilesMetricReporter thread");
+ LOG.info("Started DeltaFilesMetricReporter thread");
+ }
}
public void submit(TezCounters counters) {
if (acidMetricsExtEnabled) {
- updateMetrics(NUM_OBSOLETE_DELTAS,
- obsoleteDeltaCache, obsoleteDeltaTopN, obsoleteDeltasThreshold,
- counters);
- updateMetrics(NUM_DELTAS,
- deltaCache, deltaTopN, deltasThreshold,
- counters);
- updateMetrics(NUM_SMALL_DELTAS,
- smallDeltaCache, smallDeltaTopN, deltasThreshold,
- counters);
+ updateMetrics(NUM_OBSOLETE_DELTAS, obsoleteDeltaCache,
obsoleteDeltaTopN, counters);
+ updateMetrics(NUM_DELTAS, deltaCache, deltaTopN, counters);
+ updateMetrics(NUM_SMALL_DELTAS, smallDeltaCache, smallDeltaTopN,
counters);
}
}
+ /**
+ * Copy counters to caches.
+ */
private void updateMetrics(DeltaFilesMetricType metric, Cache<String,
Integer> cache, Queue<Pair<String, Integer>> topN,
- int threshold, TezCounters counters) {
- counters.getGroup(metric.value).forEach(counter -> {
- Integer prev = cache.getIfPresent(counter.getName());
- if (prev != null && prev != counter.getValue()) {
- cache.invalidate(counter.getName());
+ TezCounters counters) {
+ try {
+ CounterGroup group = counters.getGroup(metric.value);
+ // if the group is empty, clear the cache
+ if (group.size() == 0) {
+ cache.invalidateAll();
+ } else {
+ // if there is no counter corresponding to a cache entry, remove from
cache
+ ConcurrentMap<String, Integer> cacheMap = cache.asMap();
+ cacheMap.keySet().stream().filter(key ->
counters.findCounter(group.getName(), key).getValue() == 0)
+ .forEach(cache::invalidate);
}
- if (counter.getValue() > threshold) {
- if (topN.size() == maxCacheSize) {
- Pair<String, Integer> lowest = topN.peek();
- if (lowest != null && counter.getValue() > lowest.getValue()) {
- cache.invalidate(lowest.getKey());
- }
- }
- if (topN.size() < maxCacheSize) {
- topN.add(Pair.of(counter.getName(), (int) counter.getValue()));
- cache.put(counter.getName(), (int) counter.getValue());
+ // update existing cache entries or add new entries
+ for (TezCounter counter : group) {
+ Integer prev = cache.getIfPresent(counter.getName());
+ if (prev != null && prev != counter.getValue()) {
+ cache.invalidate(counter.getName());
}
+ topN.add(Pair.of(counter.getName(), (int) counter.getValue()));
+ cache.put(counter.getName(), (int) counter.getValue());
}
- });
+ } catch (Exception e) {
+ LOG.warn("Caught exception while trying to update delta metrics cache.
Invalidating cache", e);
+ try {
+ cache.invalidateAll();
+ } catch (Exception x) {
+ LOG.warn("Caught exception while trying to invalidate cache", x);
+ }
+ }
}
- public static void mergeDeltaFilesStats(AcidDirectory dir, long
checkThresholdInSec,
- float deltaPctThreshold, EnumMap<DeltaFilesMetricType, Map<String,
Integer>> deltaFilesStats,
- Configuration conf) throws IOException {
+ /**
+ * Update EnumMap<DeltaFilesMetricType, Queue<Pair<String, Integer>>>
deltaFilesStats with {@link AcidDirectory}
+ * contents
+ */
+ public static void mergeDeltaFilesStats(AcidDirectory dir, long
checkThresholdInSec, float deltaPctThreshold,
+ EnumMap<DeltaFilesMetricType, Queue<Pair<String, Integer>>>
deltaFilesStats, Configuration conf) throws IOException {
+
+ int deltasThreshold = HiveConf.getIntVar(conf,
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_DELTA_NUM_THRESHOLD);
Review comment:
it's inefficient to access conf values for every table/partitions
##########
File path:
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -219,8 +226,33 @@ public static void mergeDeltaFilesStats(AcidDirectory dir,
long checkThresholdIn
logDeltaDirMetrics(dir, conf, numObsoleteDeltas, numDeltas,
numSmallDeltas);
String path = getRelPath(dir);
- newDeltaFilesStats(numObsoleteDeltas, numDeltas, numSmallDeltas)
- .forEach((type, cnt) -> deltaFilesStats.computeIfAbsent(type, v -> new
HashMap<>()).put(path, cnt));
+
+ filterAndAddToDeltaFilesStats(NUM_DELTAS, numDeltas, deltasThreshold,
deltaFilesStats, path, maxCacheSize);
+ filterAndAddToDeltaFilesStats(NUM_OBSOLETE_DELTAS, numObsoleteDeltas,
obsoleteDeltasThreshold, deltaFilesStats,
+ path, maxCacheSize);
+ filterAndAddToDeltaFilesStats(NUM_SMALL_DELTAS, numSmallDeltas,
deltasThreshold, deltaFilesStats,
+ path, maxCacheSize);
+ }
+
+ /**
+ * Add partition and delta count to deltaFilesStats if the delta count is
over the recording threshold and it is in
+ * the top {@link HiveConf.ConfVars#HIVE_TXN_ACID_METRICS_MAX_CACHE_SIZE}
deltas.
+ */
+ private static void filterAndAddToDeltaFilesStats(DeltaFilesMetricType type,
int deltaCount, int deltasThreshold,
+ EnumMap<DeltaFilesMetricType, Queue<Pair<String, Integer>>>
deltaFilesStats, String path, int maxCacheSize) {
+ if (deltaCount > deltasThreshold) {
+ Queue<Pair<String,Integer>> pairQueue = deltaFilesStats.get(type);
+ if (pairQueue != null && pairQueue.size() == maxCacheSize) {
+ Pair<String, Integer> lowest = pairQueue.peek();
+ if (lowest != null && deltaCount > lowest.getValue()) {
+ pairQueue.poll();
+ }
+ }
+ if (pairQueue == null || pairQueue.size() < maxCacheSize) {
+ deltaFilesStats.computeIfAbsent(type,
+ v -> (new PriorityBlockingQueue<>(maxCacheSize,
getComparator()))).add(Pair.of(path, deltaCount));
Review comment:
Why do you need a blocking queue here? Would it be accessed by multiple
threads?
##########
File path:
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -136,67 +133,77 @@ public static synchronized void init(HiveConf conf)
throws Exception {
private void configure(HiveConf conf) throws Exception {
acidMetricsExtEnabled = MetastoreConf.getBoolVar(conf,
MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON);
+ if (acidMetricsExtEnabled) {
- deltasThreshold = HiveConf.getIntVar(conf,
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_DELTA_NUM_THRESHOLD);
- obsoleteDeltasThreshold = HiveConf.getIntVar(conf,
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
-
- initCachesForMetrics(conf);
- initObjectsForMetrics();
+ initCachesForMetrics(conf);
+ initObjectsForMetrics();
- long reportingInterval = HiveConf.getTimeVar(conf,
- HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL,
TimeUnit.SECONDS);
+ long reportingInterval =
+ HiveConf.getTimeVar(conf,
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL, TimeUnit.SECONDS);
- ThreadFactory threadFactory =
- new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat("DeltaFilesMetricReporter %d")
- .build();
- executorService =
Executors.newSingleThreadScheduledExecutor(threadFactory);
- executorService.scheduleAtFixedRate(
- new ReportingTask(), 0, reportingInterval, TimeUnit.SECONDS);
+ ThreadFactory threadFactory =
+ new
ThreadFactoryBuilder().setDaemon(true).setNameFormat("DeltaFilesMetricReporter
%d").build();
+ executorService =
Executors.newSingleThreadScheduledExecutor(threadFactory);
+ executorService.scheduleAtFixedRate(new ReportingTask(), 0,
reportingInterval, TimeUnit.SECONDS);
- LOG.info("Started DeltaFilesMetricReporter thread");
+ LOG.info("Started DeltaFilesMetricReporter thread");
+ }
}
public void submit(TezCounters counters) {
if (acidMetricsExtEnabled) {
- updateMetrics(NUM_OBSOLETE_DELTAS,
- obsoleteDeltaCache, obsoleteDeltaTopN, obsoleteDeltasThreshold,
- counters);
- updateMetrics(NUM_DELTAS,
- deltaCache, deltaTopN, deltasThreshold,
- counters);
- updateMetrics(NUM_SMALL_DELTAS,
- smallDeltaCache, smallDeltaTopN, deltasThreshold,
- counters);
+ updateMetrics(NUM_OBSOLETE_DELTAS, obsoleteDeltaCache,
obsoleteDeltaTopN, counters);
+ updateMetrics(NUM_DELTAS, deltaCache, deltaTopN, counters);
+ updateMetrics(NUM_SMALL_DELTAS, smallDeltaCache, smallDeltaTopN,
counters);
}
}
+ /**
+ * Copy counters to caches.
+ */
private void updateMetrics(DeltaFilesMetricType metric, Cache<String,
Integer> cache, Queue<Pair<String, Integer>> topN,
- int threshold, TezCounters counters) {
- counters.getGroup(metric.value).forEach(counter -> {
- Integer prev = cache.getIfPresent(counter.getName());
- if (prev != null && prev != counter.getValue()) {
- cache.invalidate(counter.getName());
+ TezCounters counters) {
+ try {
+ CounterGroup group = counters.getGroup(metric.value);
+ // if the group is empty, clear the cache
+ if (group.size() == 0) {
+ cache.invalidateAll();
+ } else {
+ // if there is no counter corresponding to a cache entry, remove from
cache
+ ConcurrentMap<String, Integer> cacheMap = cache.asMap();
Review comment:
This is supposed to be a global cache. I am not sure I understand it
right. But looks like if some part is not present in tez counters it would be
removed from global cache.
##########
File path:
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -267,32 +299,26 @@ private static String getRelPath(AcidUtils.Directory
directory) {
directory.getPath().getName();
}
- private static EnumMap<DeltaFilesMetricType, Integer> newDeltaFilesStats(int
numObsoleteDeltas, int numDeltas, int numSmallDeltas) {
- return new EnumMap<DeltaFilesMetricType,
Integer>(DeltaFilesMetricType.class) {{
- put(NUM_OBSOLETE_DELTAS, numObsoleteDeltas);
- put(NUM_DELTAS, numDeltas);
- put(NUM_SMALL_DELTAS, numSmallDeltas);
- }};
- }
-
public static void createCountersForAcidMetrics(TezCounters tezCounters,
JobConf jobConf) {
- if (HiveConf.getBoolVar(jobConf,
HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED) &&
- MetastoreConf.getBoolVar(jobConf,
MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON)) {
+ try {
+ if (HiveConf.getBoolVar(jobConf,
HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED) && MetastoreConf
+ .getBoolVar(jobConf,
MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON)) {
- Arrays.stream(DeltaFilesMetricType.values())
- .filter(type -> jobConf.get(type.name()) != null)
- .forEach(type ->
-
Splitter.on(',').withKeyValueSeparator("->").split(jobConf.get(type.name())).forEach(
- (path, cnt) -> tezCounters.findCounter(type.value,
path).setValue(Long.parseLong(cnt))
- )
- );
+ Arrays.stream(DeltaFilesMetricType.values()).filter(type ->
jobConf.get(type.name()) != null).forEach(
+ type ->
Splitter.on(',').withKeyValueSeparator("->").split(jobConf.get(type.name()))
+ .forEach((path, cnt) -> tezCounters.findCounter(type.value,
path).setValue(Long.parseLong(cnt))));
+ }
+ } catch (Exception e) {
Review comment:
I don't see where a checked exception could be thrown here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]