klcopp commented on a change in pull request #2916:
URL: https://github.com/apache/hive/pull/2916#discussion_r787044907



##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -435,55 +187,71 @@ private void initObjectsForMetrics() throws Exception {
         .getObjectName());
   }
 
-  private void initCachesForMetrics(HiveConf conf) {
-    int maxCacheSize = HiveConf.getIntVar(conf, 
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_MAX_CACHE_SIZE);
-    long duration = HiveConf.getTimeVar(conf,
-        HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_CACHE_DURATION, 
TimeUnit.SECONDS);
-
-    deltaTopN = new PriorityBlockingQueue<>(maxCacheSize, getComparator());
-    smallDeltaTopN = new PriorityBlockingQueue<>(maxCacheSize, 
getComparator());
-    obsoleteDeltaTopN = new PriorityBlockingQueue<>(maxCacheSize, 
getComparator());
-
-    deltaCache = CacheBuilder.newBuilder()
-      .expireAfterWrite(duration, TimeUnit.SECONDS)
-      .removalListener(notification -> removalPredicate(deltaTopN, 
notification))
-      .softValues()
-      .build();
-
-    smallDeltaCache = CacheBuilder.newBuilder()
-      .expireAfterWrite(duration, TimeUnit.SECONDS)
-      .removalListener(notification -> removalPredicate(smallDeltaTopN, 
notification))
-      .softValues()
-      .build();
-
-    obsoleteDeltaCache = CacheBuilder.newBuilder()
-      .expireAfterWrite(duration, TimeUnit.SECONDS)
-      .removalListener(notification -> removalPredicate(obsoleteDeltaTopN, 
notification))
-      .softValues()
-      .build();
-  }
-
-  private static Comparator<Pair<String, Integer>> getComparator() {
-    return Comparator.comparing(Pair::getValue);
-  }
+  private final class ReportingTask implements Runnable {
 
-  private void removalPredicate(BlockingQueue<Pair<String, Integer>> topN, 
RemovalNotification notification) {
-    topN.removeIf(item -> item.getKey().equals(notification.getKey()));
-  }
+    private final TxnStore txnHandler;
 
-  private final class ReportingTask implements Runnable {
+    private ReportingTask(TxnStore txnHandler) {
+      this.txnHandler = txnHandler;
+    }
     @Override
     public void run() {
       Metrics metrics = MetricsFactory.getInstance();
       if (metrics != null) {
-        obsoleteDeltaCache.cleanUp();
-        obsoleteDeltaObject.updateAll(obsoleteDeltaCache.asMap());
+        try {
+          LOG.debug("Called reporting task.");
+          List<CompactionMetricsData> deltas = 
txnHandler.getTopCompactionMetricsDataPerType(maxCacheSize);
+          Map<String, Integer> deltasMap = deltas.stream()
+              .filter(d -> d.getMetricType() == 
CompactionMetricsData.MetricType.NUM_DELTAS).collect(
+              Collectors.toMap(item -> getDeltaCountKey(item.getDbName(), 
item.getTblName(), item.getPartitionName()),
+                  CompactionMetricsData::getMetricValue));
+          deltaObject.updateAll(deltasMap);
+
+          Map<String, Integer> smallDeltasMap = deltas.stream()
+              .filter(d -> d.getMetricType() == 
CompactionMetricsData.MetricType.NUM_SMALL_DELTAS).collect(
+              Collectors.toMap(item -> getDeltaCountKey(item.getDbName(), 
item.getTblName(), item.getPartitionName()),
+                  CompactionMetricsData::getMetricValue));
+          smallDeltaObject.updateAll(smallDeltasMap);
+
+          Map<String, Integer> obsoleteDeltasMap = deltas.stream()
+              .filter(d -> d.getMetricType() == 
CompactionMetricsData.MetricType.NUM_OBSOLETE_DELTAS).collect(
+              Collectors.toMap(item -> getDeltaCountKey(item.getDbName(), 
item.getTblName(), item.getPartitionName()),
+                  CompactionMetricsData::getMetricValue));
+          obsoleteDeltaObject.updateAll(obsoleteDeltasMap);
+        } catch (MetaException e) {

Review comment:
       Maybe catch all Throwables here just in case? (and also in run())




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to