klcopp commented on a change in pull request #2563:
URL: https://github.com/apache/hive/pull/2563#discussion_r686876814
##########
File path:
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -136,67 +133,77 @@ public static synchronized void init(HiveConf conf)
throws Exception {
private void configure(HiveConf conf) throws Exception {
acidMetricsExtEnabled = MetastoreConf.getBoolVar(conf,
MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON);
+ if (acidMetricsExtEnabled) {
- deltasThreshold = HiveConf.getIntVar(conf,
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_DELTA_NUM_THRESHOLD);
- obsoleteDeltasThreshold = HiveConf.getIntVar(conf,
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
-
- initCachesForMetrics(conf);
- initObjectsForMetrics();
+ initCachesForMetrics(conf);
+ initObjectsForMetrics();
- long reportingInterval = HiveConf.getTimeVar(conf,
- HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL,
TimeUnit.SECONDS);
+ long reportingInterval =
+ HiveConf.getTimeVar(conf,
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL, TimeUnit.SECONDS);
- ThreadFactory threadFactory =
- new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat("DeltaFilesMetricReporter %d")
- .build();
- executorService =
Executors.newSingleThreadScheduledExecutor(threadFactory);
- executorService.scheduleAtFixedRate(
- new ReportingTask(), 0, reportingInterval, TimeUnit.SECONDS);
+ ThreadFactory threadFactory =
+ new
ThreadFactoryBuilder().setDaemon(true).setNameFormat("DeltaFilesMetricReporter
%d").build();
+ executorService =
Executors.newSingleThreadScheduledExecutor(threadFactory);
+ executorService.scheduleAtFixedRate(new ReportingTask(), 0,
reportingInterval, TimeUnit.SECONDS);
- LOG.info("Started DeltaFilesMetricReporter thread");
+ LOG.info("Started DeltaFilesMetricReporter thread");
+ }
}
public void submit(TezCounters counters) {
if (acidMetricsExtEnabled) {
- updateMetrics(NUM_OBSOLETE_DELTAS,
- obsoleteDeltaCache, obsoleteDeltaTopN, obsoleteDeltasThreshold,
- counters);
- updateMetrics(NUM_DELTAS,
- deltaCache, deltaTopN, deltasThreshold,
- counters);
- updateMetrics(NUM_SMALL_DELTAS,
- smallDeltaCache, smallDeltaTopN, deltasThreshold,
- counters);
+ updateMetrics(NUM_OBSOLETE_DELTAS, obsoleteDeltaCache,
obsoleteDeltaTopN, counters);
+ updateMetrics(NUM_DELTAS, deltaCache, deltaTopN, counters);
+ updateMetrics(NUM_SMALL_DELTAS, smallDeltaCache, smallDeltaTopN,
counters);
}
}
+ /**
+ * Copy counters to caches.
+ */
private void updateMetrics(DeltaFilesMetricType metric, Cache<String,
Integer> cache, Queue<Pair<String, Integer>> topN,
- int threshold, TezCounters counters) {
- counters.getGroup(metric.value).forEach(counter -> {
- Integer prev = cache.getIfPresent(counter.getName());
- if (prev != null && prev != counter.getValue()) {
- cache.invalidate(counter.getName());
+ TezCounters counters) {
+ try {
+ CounterGroup group = counters.getGroup(metric.value);
+ // if the group is empty, clear the cache
+ if (group.size() == 0) {
+ cache.invalidateAll();
+ } else {
+ // if there is no counter corresponding to a cache entry, remove from
cache
+ ConcurrentMap<String, Integer> cacheMap = cache.asMap();
+ cacheMap.keySet().stream().filter(key ->
counters.findCounter(group.getName(), key).getValue() == 0)
+ .forEach(cache::invalidate);
}
- if (counter.getValue() > threshold) {
- if (topN.size() == maxCacheSize) {
- Pair<String, Integer> lowest = topN.peek();
- if (lowest != null && counter.getValue() > lowest.getValue()) {
- cache.invalidate(lowest.getKey());
- }
- }
- if (topN.size() < maxCacheSize) {
- topN.add(Pair.of(counter.getName(), (int) counter.getValue()));
- cache.put(counter.getName(), (int) counter.getValue());
+ // update existing cache entries or add new entries
+ for (TezCounter counter : group) {
+ Integer prev = cache.getIfPresent(counter.getName());
+ if (prev != null && prev != counter.getValue()) {
+ cache.invalidate(counter.getName());
}
+ topN.add(Pair.of(counter.getName(), (int) counter.getValue()));
Review comment:
mergeDeltaFilesStats filters for whether the partition belongs in topN,
and counters are only created if the partition belongs in the topN.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]