lcspinter commented on a change in pull request #2916:
URL: https://github.com/apache/hive/pull/2916#discussion_r787544691
##########
File path:
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -435,55 +187,71 @@ private void initObjectsForMetrics() throws Exception {
.getObjectName());
}
- private void initCachesForMetrics(HiveConf conf) {
- int maxCacheSize = HiveConf.getIntVar(conf,
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_MAX_CACHE_SIZE);
- long duration = HiveConf.getTimeVar(conf,
- HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_CACHE_DURATION,
TimeUnit.SECONDS);
-
- deltaTopN = new PriorityBlockingQueue<>(maxCacheSize, getComparator());
- smallDeltaTopN = new PriorityBlockingQueue<>(maxCacheSize,
getComparator());
- obsoleteDeltaTopN = new PriorityBlockingQueue<>(maxCacheSize,
getComparator());
-
- deltaCache = CacheBuilder.newBuilder()
- .expireAfterWrite(duration, TimeUnit.SECONDS)
- .removalListener(notification -> removalPredicate(deltaTopN,
notification))
- .softValues()
- .build();
-
- smallDeltaCache = CacheBuilder.newBuilder()
- .expireAfterWrite(duration, TimeUnit.SECONDS)
- .removalListener(notification -> removalPredicate(smallDeltaTopN,
notification))
- .softValues()
- .build();
-
- obsoleteDeltaCache = CacheBuilder.newBuilder()
- .expireAfterWrite(duration, TimeUnit.SECONDS)
- .removalListener(notification -> removalPredicate(obsoleteDeltaTopN,
notification))
- .softValues()
- .build();
- }
-
- private static Comparator<Pair<String, Integer>> getComparator() {
- return Comparator.comparing(Pair::getValue);
- }
+ private final class ReportingTask implements Runnable {
- private void removalPredicate(BlockingQueue<Pair<String, Integer>> topN,
RemovalNotification notification) {
- topN.removeIf(item -> item.getKey().equals(notification.getKey()));
- }
+ private final TxnStore txnHandler;
- private final class ReportingTask implements Runnable {
+ private ReportingTask(TxnStore txnHandler) {
+ this.txnHandler = txnHandler;
+ }
@Override
public void run() {
Metrics metrics = MetricsFactory.getInstance();
if (metrics != null) {
- obsoleteDeltaCache.cleanUp();
- obsoleteDeltaObject.updateAll(obsoleteDeltaCache.asMap());
+ try {
+ LOG.debug("Called reporting task.");
+ List<CompactionMetricsData> deltas =
txnHandler.getTopCompactionMetricsDataPerType(maxCacheSize);
+ Map<String, Integer> deltasMap = deltas.stream()
+ .filter(d -> d.getMetricType() ==
CompactionMetricsData.MetricType.NUM_DELTAS).collect(
+ Collectors.toMap(item -> getDeltaCountKey(item.getDbName(),
item.getTblName(), item.getPartitionName()),
+ CompactionMetricsData::getMetricValue));
+ deltaObject.updateAll(deltasMap);
+
+ Map<String, Integer> smallDeltasMap = deltas.stream()
+ .filter(d -> d.getMetricType() ==
CompactionMetricsData.MetricType.NUM_SMALL_DELTAS).collect(
+ Collectors.toMap(item -> getDeltaCountKey(item.getDbName(),
item.getTblName(), item.getPartitionName()),
+ CompactionMetricsData::getMetricValue));
+ smallDeltaObject.updateAll(smallDeltasMap);
+
+ Map<String, Integer> obsoleteDeltasMap = deltas.stream()
+ .filter(d -> d.getMetricType() ==
CompactionMetricsData.MetricType.NUM_OBSOLETE_DELTAS).collect(
+ Collectors.toMap(item -> getDeltaCountKey(item.getDbName(),
item.getTblName(), item.getPartitionName()),
+ CompactionMetricsData::getMetricValue));
+ obsoleteDeltaObject.updateAll(obsoleteDeltasMap);
+ } catch (MetaException e) {
Review comment:
Right, I will do that.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]