klcopp commented on a change in pull request #2916:
URL: https://github.com/apache/hive/pull/2916#discussion_r782811556



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -142,6 +144,9 @@ public void init(AtomicBoolean stop) throws Exception {
     super.init(stop);
     this.workerName = getWorkerId();
     setName(workerName);
+    metricsEnabled = MetastoreConf.getBoolVar(conf, 
MetastoreConf.ConfVars.METRICS_ENABLED) &&
+        MetastoreConf.getBoolVar(conf, 
MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON) &&
+        MetastoreConf.getBoolVar(conf, 
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON);

Review comment:
       COMPACTOR_INITIATOR_ON could be false the HS2 running this Worker, but 
metrics should still be collected.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -87,14 +106,15 @@ public void init(AtomicBoolean stop) throws Exception {
     cleanerExecutor = CompactorUtil.createExecutorWithThreadFactory(
             
conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_THREADS_NUM),
             COMPACTOR_CLEANER_THREAD_NAME_FORMAT);
+    metricsEnabled = MetastoreConf.getBoolVar(conf, 
MetastoreConf.ConfVars.METRICS_ENABLED) &&
+        MetastoreConf.getBoolVar(conf, 
MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON) &&
+        MetastoreConf.getBoolVar(conf, 
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON);

Review comment:
       COMPACTOR_INITIATOR_ON also controls whether the Cleaner runs, so this 
line is unnecessary... but if you want to leave it in for posterity/the future, 
I understand.

##########
File path: 
standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
##########
@@ -2926,11 +2945,15 @@ PartitionsResponse 
get_partitions_req(1:PartitionsRequest req)
   void mark_cleaned(1:CompactionInfoStruct cr) throws(1:MetaException o1)
   void mark_compacted(1: CompactionInfoStruct cr) throws(1:MetaException o1)
   void mark_failed(1: CompactionInfoStruct cr) throws(1:MetaException o1)
+  CompactionMetricsDataResponse get_compaction_metrics_data(1: string dbName, 
2: string tblName, 3: string partitionName, 4: CompactionMetricsMetricType 
type) throws(1:MetaException o1)

Review comment:
       If there's any doubt that the parameters might be changed, I recommend 
introducing a CompactionMetricsDataRequest object

##########
File path: 
ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestDeltaFilesMetrics.java
##########
@@ -39,199 +45,396 @@
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 import java.lang.management.ManagementFactory;
-import java.util.EnumMap;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.concurrent.TimeUnit;
 
-import static 
org.apache.hadoop.hive.ql.txn.compactor.metrics.DeltaFilesMetricReporter.DeltaFilesMetricType.NUM_DELTAS;
-import static 
org.apache.hadoop.hive.ql.txn.compactor.metrics.DeltaFilesMetricReporter.DeltaFilesMetricType.NUM_OBSOLETE_DELTAS;
-import static 
org.apache.hadoop.hive.ql.txn.compactor.metrics.DeltaFilesMetricReporter.DeltaFilesMetricType.NUM_SMALL_DELTAS;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 public class TestDeltaFilesMetrics extends CompactorTest  {
 
   private void setUpHiveConf() {
-    HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED, 
true);
-    HiveConf.setIntVar(conf, 
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_MAX_CACHE_SIZE, 2);
-    HiveConf.setTimeVar(conf, 
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_CACHE_DURATION, 7200, TimeUnit.SECONDS);
-    HiveConf.setIntVar(conf, 
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_OBSOLETE_DELTA_NUM_THRESHOLD, 100);
-    HiveConf.setIntVar(conf, 
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_DELTA_NUM_THRESHOLD, 100);
-    HiveConf.setTimeVar(conf, 
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL, 1, 
TimeUnit.SECONDS);
+    MetastoreConf.setLongVar(conf, 
MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD, 1);
+    MetastoreConf.setLongVar(conf, 
MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_OBSOLETE_DELTA_NUM_THRESHOLD, 1);
+    MetastoreConf.setTimeVar(conf, 
MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_REPORTING_INTERVAL, 1,
+        TimeUnit.SECONDS);
+    MetastoreConf.setDoubleVar(conf, 
MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_PCT_THRESHOLD, 0.15f);
+    MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED, 
true);
+    MetastoreConf.setBoolVar(conf, 
MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON, true);
+    MetastoreConf.setBoolVar(conf, 
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
   }
 
-  private void initAndCollectFirstMetrics() throws Exception {
-    MetricsFactory.close();
-    MetricsFactory.init(conf);
+  @After
+  public void tearDown() throws Exception {
+    DeltaFilesMetricReporter.close();
+  }
 
-    DeltaFilesMetricReporter.init(conf);
 
-    TezCounters tezCounters = new TezCounters();
-    tezCounters.findCounter(NUM_OBSOLETE_DELTAS + "", 
"default.acid/p=1").setValue(200);
-    tezCounters.findCounter(NUM_OBSOLETE_DELTAS + "", 
"default.acid/p=2").setValue(100);
-    tezCounters.findCounter(NUM_OBSOLETE_DELTAS + "", 
"default.acid/p=3").setValue(150);
-    tezCounters.findCounter(NUM_OBSOLETE_DELTAS + "", 
"default.acid_v2").setValue(250);
+  static void verifyMetricsMatch(Map<String, String> expected, Map<String, 
String> actual) {
+    Assert.assertTrue("Actual metrics " + actual + " don't match expected: " + 
expected,
+        equivalent(expected, actual));
+  }
 
-    tezCounters.findCounter(NUM_DELTAS + "", "default.acid/p=1").setValue(150);
-    tezCounters.findCounter(NUM_DELTAS + "", "default.acid/p=2").setValue(100);
-    tezCounters.findCounter(NUM_DELTAS + "", "default.acid/p=3").setValue(250);
-    tezCounters.findCounter(NUM_DELTAS + "", "default.acid_v2").setValue(200);
+  private static boolean equivalent(Map<String, String> lhs, Map<String, 
String> rhs) {
+    return lhs.size() == rhs.size() && Maps.difference(lhs, rhs).areEqual();
+  }
 
-    tezCounters.findCounter(NUM_SMALL_DELTAS + "", 
"default.acid/p=1").setValue(250);
-    tezCounters.findCounter(NUM_SMALL_DELTAS + "", 
"default.acid/p=2").setValue(200);
-    tezCounters.findCounter(NUM_SMALL_DELTAS + "", 
"default.acid/p=3").setValue(150);
-    tezCounters.findCounter(NUM_SMALL_DELTAS + "", 
"default.acid_v2").setValue(100);
+  static Map<String, String> gaugeToMap(String metric) throws Exception {
+    MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+    ObjectName oname = new 
ObjectName(DeltaFilesMetricReporter.OBJECT_NAME_PREFIX + metric);
+    MBeanInfo mbeanInfo = mbs.getMBeanInfo(oname);
 
-    DeltaFilesMetricReporter.getInstance().submit(tezCounters, null);
-    Thread.sleep(1000);
+    Map<String, String> result = new HashMap<>();
+    for (MBeanAttributeInfo attr : mbeanInfo.getAttributes()) {
+      result.put(attr.getName(), String.valueOf(mbs.getAttribute(oname, 
attr.getName())));
+    }
+    return result;
   }
 
-  @After
-  public void tearDown() {
-    DeltaFilesMetricReporter.close();
+  @Override
+  boolean useHive130DeltaDirName() {
+    return false;
   }
 
   @Test
-  public void testDeltaFilesMetric() throws Exception {
+  public void testDeltaFileMetricPartitionedTable() throws Exception {
     setUpHiveConf();
-    initAndCollectFirstMetrics();
+    String dbName = "default";
+    String tblName = "dp";
+    String partName = "ds=part1";
 
+    Table t = newTable(dbName, tblName, true);
+    List<LockComponent> components = new ArrayList<>();
+
+    Partition p = newPartition(t, "part1");
+    addBaseFile(t, p, 20L, 20);
+    addDeltaFile(t, p, 21L, 22L, 2);
+    addDeltaFile(t, p, 23L, 24L, 20);
+
+    components.add(createLockComponent(dbName, tblName, partName));
+
+    burnThroughTransactions(dbName, tblName, 23);
+    long txnid = openTxn();
+
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(txnid);
+    LockResponse res = txnHandler.lock(req);
+    Assert.assertEquals(LockState.ACQUIRED, res.getState());
+
+    long writeid = allocateWriteId(dbName, tblName, txnid);
+    Assert.assertEquals(24, writeid);
+    txnHandler.commitTxn(new CommitTxnRequest(txnid));
+
+    startInitiator();
+
+    TimeUnit.SECONDS.sleep(1);

Review comment:
       Since the reporting interval is also set to 1 s, just to be on the safe 
side, maybe increase this sleep to 1.5 s?

##########
File path: 
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
##########
@@ -4112,6 +4126,189 @@ public MetricsInfo getMetricsInfo() throws 
MetaException {
     }
   }
 
+  @Override
+  public CompactionMetricsData getCompactionMetricsData(String dbName, String 
tblName, String partitionName,
+      CompactionMetricsData.MetricType type) throws MetaException {
+    Connection dbConn = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        String query = SELECT_COMPACTION_METRICS_CACHE_QUERY;
+        if (partitionName != null) {
+          query += " AND \"CMC_PARTITION\" = ?";
+        } else {
+          query += " AND \"CMC_PARTITION\" IS NULL";
+        }
+        try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+          pstmt.setString(1, dbName);
+          pstmt.setString(2, tblName);
+          pstmt.setString(3, type.toString());
+          if (partitionName != null) {
+            pstmt.setString(4, partitionName);
+          }
+          ResultSet resultSet = pstmt.executeQuery();
+          CompactionMetricsData.Builder builder = new 
CompactionMetricsData.Builder();
+          if (resultSet.next()) {
+            return 
builder.dbName(dbName).tblName(tblName).partitionName(partitionName).metricType(type)
+                
.metricValue(resultSet.getInt(1)).version(resultSet.getInt(2)).build();
+          } else {
+            return null;
+          }
+        }
+
+      } catch (SQLException e) {
+        LOG.error("Unable to getDeltaMetricsInfo");
+        checkRetryable(e, "getDeltaMetricsInfo");
+        throw new MetaException("Unable to execute getDeltaMetricsInfo()" + 
StringUtils.stringifyException(e));

Review comment:
       We should be really careful about throwing exceptions... if metric 
collection fails, the Initiator/Worker/Cleaner should continue with no issues.
   (Also, IMO logging a warning is enough, but it's up to you)

##########
File path: 
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
##########
@@ -308,7 +307,22 @@
       "SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" FROM 
\"TXN_COMPONENTS\" " +
           "INNER JOIN \"TXNS\" ON \"TC_TXNID\" = \"TXN_ID\" WHERE 
\"TXN_STATE\" = " + TxnStatus.ABORTED +
       " GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" HAVING 
COUNT(\"TXN_ID\") > ?";
-
+  private static final String SELECT_COMPACTION_METRICS_CACHE_QUERY =
+      "SELECT \"CMC_METRIC_VALUE\", \"CMC_VERSION\" FROM 
\"COMPACTION_METRICS_CACHE\" " +
+      "WHERE \"CMC_DATABASE\" = ? AND \"CMC_TABLE\" = ? AND 
\"CMC_METRIC_TYPE\" = ?";
+  private static final String 
NO_SELECT_COMPACTION_METRICS_CACHE_FOR_TYPE_QUERY =
+      "* FROM \"COMPACTION_METRICS_CACHE\" WHERE \"CMC_METRIC_TYPE\" = ? ORDER 
BY \"CMC_METRIC_VALUE\" DESC";

Review comment:
       Since the COMPACTION_METRICS_CACHE schema might change in the future, 
it's better to select each column by name instead of "select *"

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -512,7 +284,177 @@ private void shutdown() {
     }
   }
 
-  public static class DeltaFilesMetadata implements Serializable {
-    public String dbName, tableName, partitionName;
+  public static void updateMetricsFromInitiator(AcidDirectory dir, String 
dbName, String tableName, String partitionName,
+      Configuration conf, TxnStore txnHandler) {
+    LOG.debug("Updating delta file metrics from initiator");
+    double deltaPctThreshold = MetastoreConf.getDoubleVar(conf, 
MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_PCT_THRESHOLD);
+    int deltasThreshold = MetastoreConf.getIntVar(conf, 
MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD);
+    int obsoleteDeltasThreshold = MetastoreConf.getIntVar(conf,
+        
MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
+    try {
+      // We have an AcidDir from the initiator, therefore we can use that to 
calculate active,small, obsolete delta
+      // count
+      long baseSize = getBaseSize(dir);
+
+      int numDeltas = dir.getCurrentDirectories().size();
+      int numSmallDeltas = 0;
+
+      for (AcidUtils.ParsedDelta delta : dir.getCurrentDirectories()) {
+        long deltaSize = getDirSize(delta, dir.getFs());
+        if (baseSize != 0 && deltaSize / (float) baseSize < deltaPctThreshold) 
{
+          numSmallDeltas++;
+        }
+      }
+
+      int numObsoleteDeltas = dir.getObsolete().size();
+
+      if (numDeltas > deltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, 
CompactionMetricsData.MetricType.NUM_DELTAS, numDeltas,

Review comment:
       For example, if the threshold is deltas=5, there are deltas=7 in the 
cache, but now there are 2 deltas in this AcidDir: Will the cache entry be 
removed?

##########
File path: 
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
##########
@@ -8831,6 +8833,34 @@ public void mark_failed(CompactionInfoStruct cr) throws 
MetaException {
     getTxnHandler().markFailed(CompactionInfo.compactionStructToInfo(cr));
   }
 
+  @Override
+  public CompactionMetricsDataResponse get_compaction_metrics_data(String 
dbName, String tblName, String partitionName, CompactionMetricsMetricType type) 
throws MetaException {
+    CompactionMetricsData metricsData =
+        getTxnHandler().getCompactionMetricsData(dbName, tblName, 
partitionName,
+            
CompactionMetricsDataConverter.thriftCompactionMetricType2DbType(type));
+    CompactionMetricsDataResponse response = new 
CompactionMetricsDataResponse();
+    if (metricsData != null) {
+      
response.setData(CompactionMetricsDataConverter.dataToStruct(metricsData));
+    }
+    return response;
+  }
+
+  @Override
+  public boolean update_compaction_metrics_data(CompactionMetricsDataStruct 
struct, int version) throws MetaException {
+      return 
getTxnHandler().updateCompactionMetricsData(CompactionMetricsDataConverter.structToData(struct),
 version);

Review comment:
       Could `struct` be null?

##########
File path: 
standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
##########
@@ -661,6 +661,16 @@ CREATE TABLE COMPLETED_COMPACTIONS (
 
 CREATE INDEX COMPLETED_COMPACTIONS_RES ON COMPLETED_COMPACTIONS 
(CC_DATABASE,CC_TABLE,CC_PARTITION);
 
+-- HIVE-25842
+CREATE TABLE COMPACTION_METRICS_CACHE (
+  CMC_DATABASE varchar(128) NOT NULL,
+  CMC_TABLE varchar(128) NOT NULL,
+  CMC_PARTITION varchar(767),
+  CMC_METRIC_TYPE varchar(128) NOT NULL,
+  CMC_METRIC_VALUE integer NOT NULL,

Review comment:
       Is it a possibility that in the future, for whatever reason, we'll want 
this value to be NULL?

##########
File path: 
ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java
##########
@@ -81,6 +81,7 @@
   public void setUp() throws Exception {
     MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED, 
true);
     MetastoreConf.setBoolVar(conf, 
MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_LEVEL, true);
+    MetastoreConf.setBoolVar(conf, 
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);

Review comment:
       Does this do anything?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -22,7 +22,24 @@
 import org.apache.hadoop.hive.common.StringableMap;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.metastore.ReplChangeManager;
-import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.api.DataOperationType;

Review comment:
       +1, thanks!

##########
File path: 
standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
##########
@@ -2926,11 +2945,15 @@ PartitionsResponse 
get_partitions_req(1:PartitionsRequest req)
   void mark_cleaned(1:CompactionInfoStruct cr) throws(1:MetaException o1)
   void mark_compacted(1: CompactionInfoStruct cr) throws(1:MetaException o1)
   void mark_failed(1: CompactionInfoStruct cr) throws(1:MetaException o1)
+  CompactionMetricsDataResponse get_compaction_metrics_data(1: string dbName, 
2: string tblName, 3: string partitionName, 4: CompactionMetricsMetricType 
type) throws(1:MetaException o1)
+  bool update_compaction_metrics_data(1: CompactionMetricsDataStruct data, 2: 
i32 version) throws(1:MetaException o1)
+  void add_compaction_metrics_data(1: CompactionMetricsDataStruct data) 
throws(1:MetaException o1)
+  void remove_compaction_metrics_data(1: string dbName, 2: string tblName, 3: 
string partitionName, 4: CompactionMetricsMetricType type) 
throws(1:MetaException o1)

Review comment:
       Same as above re: CompactionMetricsDataRequest

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -142,6 +144,9 @@ public void init(AtomicBoolean stop) throws Exception {
     super.init(stop);
     this.workerName = getWorkerId();
     setName(workerName);
+    metricsEnabled = MetastoreConf.getBoolVar(conf, 
MetastoreConf.ConfVars.METRICS_ENABLED) &&

Review comment:
       Maybe add a short comment highlighting that the **HMS** metrics need to 
be on.

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -512,7 +284,177 @@ private void shutdown() {
     }
   }
 
-  public static class DeltaFilesMetadata implements Serializable {
-    public String dbName, tableName, partitionName;
+  public static void updateMetricsFromInitiator(AcidDirectory dir, String 
dbName, String tableName, String partitionName,
+      Configuration conf, TxnStore txnHandler) {
+    LOG.debug("Updating delta file metrics from initiator");
+    double deltaPctThreshold = MetastoreConf.getDoubleVar(conf, 
MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_PCT_THRESHOLD);
+    int deltasThreshold = MetastoreConf.getIntVar(conf, 
MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD);
+    int obsoleteDeltasThreshold = MetastoreConf.getIntVar(conf,
+        
MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
+    try {
+      // We have an AcidDir from the initiator, therefore we can use that to 
calculate active,small, obsolete delta
+      // count
+      long baseSize = getBaseSize(dir);
+
+      int numDeltas = dir.getCurrentDirectories().size();
+      int numSmallDeltas = 0;
+
+      for (AcidUtils.ParsedDelta delta : dir.getCurrentDirectories()) {
+        long deltaSize = getDirSize(delta, dir.getFs());
+        if (baseSize != 0 && deltaSize / (float) baseSize < deltaPctThreshold) 
{
+          numSmallDeltas++;
+        }
+      }
+
+      int numObsoleteDeltas = dir.getObsolete().size();
+
+      if (numDeltas > deltasThreshold) {

Review comment:
       Should be >=, since the threshold definition is "The minimum number of 
active delta files a table/partition must have..."
   
   Same for the others.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -671,6 +679,13 @@ private String getWorkerId() {
     return name.toString();
   }
 
+  private void updateDeltaFilesMetrics(AcidDirectory directory, String dbName, 
String tableName, String partName,
+      CompactionType type) {
+    if (metricsEnabled) {
+      DeltaFilesMetricReporter.updateMetricsFromWorker(directory, dbName, 
tableName, partName, type, conf, msc);

Review comment:
       The DeltaFilesMetricReporter instance is located in memory of the HMS, 
but this Worker thread is probably running on an HS2... how does this work?

##########
File path: service/src/java/org/apache/hive/service/server/HiveServer2.java
##########
@@ -214,9 +214,6 @@ public synchronized void init(HiveConf hiveConf) {
     try {
       if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_METRICS_ENABLED)) {
         MetricsFactory.init(hiveConf);
-        if (MetastoreConf.getBoolVar(hiveConf, 
MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON)) {
-          DeltaFilesMetricReporter.init(hiveConf);

Review comment:
       There is also an unnecessary DeltaFilesMetricReporter.close() in this 
file.

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -310,81 +143,6 @@ private static String getDeltaCountKey(String dbName, 
String tableName, String p
     return key.toString();
   }
 
-  private static void logDeltaDirMetrics(AcidDirectory dir, Configuration 
conf, int numObsoleteDeltas, int numDeltas,
-      int numSmallDeltas) {
-    long loggerFrequency = HiveConf
-        .getTimeVar(conf, 
HiveConf.ConfVars.HIVE_COMPACTOR_ACID_METRICS_LOGGER_FREQUENCY, 
TimeUnit.MILLISECONDS);
-    if (loggerFrequency <= 0) {
-      return;
-    }
-    long currentTime = System.currentTimeMillis();
-    if (lastSuccessfulLoggingTime == 0 || currentTime >= 
lastSuccessfulLoggingTime + loggerFrequency) {
-      lastSuccessfulLoggingTime = currentTime;
-      if (numDeltas >= HiveConf.getIntVar(conf, 
HiveConf.ConfVars.HIVE_COMPACTOR_ACTIVE_DELTA_DIR_THRESHOLD)) {
-        LOG.warn("Directory " + dir.getPath() + " contains " + numDeltas + " 
active delta directories. This can " +
-            "cause performance degradation.");
-      }
-
-      if (numObsoleteDeltas >=
-          HiveConf.getIntVar(conf, 
HiveConf.ConfVars.HIVE_COMPACTOR_OBSOLETE_DELTA_DIR_THRESHOLD)) {
-        LOG.warn("Directory " + dir.getPath() + " contains " + numDeltas + " 
obsolete delta directories. This can " +
-            "indicate compaction cleaner issues.");
-      }
-
-      if (numSmallDeltas >= HiveConf.getIntVar(conf, 
HiveConf.ConfVars.HIVE_COMPACTOR_SMALL_DELTA_DIR_THRESHOLD)) {
-        LOG.warn("Directory " + dir.getPath() + " contains " + numDeltas + " 
small delta directories. This can " +
-            "indicate performance degradation and there might be a problem 
with your streaming setup.");
-      }
-    }
-  }
-
-  private static int getNumObsoleteDeltas(AcidDirectory dir, long 
checkThresholdInSec) throws IOException {
-    int numObsoleteDeltas = 0;
-    for (Path obsolete : dir.getObsolete()) {
-      FileStatus stat = dir.getFs().getFileStatus(obsolete);
-      if (System.currentTimeMillis() - stat.getModificationTime() >= 
checkThresholdInSec * 1000) {
-        numObsoleteDeltas++;
-      }
-    }
-    return numObsoleteDeltas;
-  }
-
-  public static void createCountersForAcidMetrics(TezCounters tezCounters, 
JobConf jobConf) {
-    if (HiveConf.getBoolVar(jobConf, 
HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED) &&
-      MetastoreConf.getBoolVar(jobConf, 
MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON)) {
-
-      Arrays.stream(DeltaFilesMetricType.values())
-        .filter(type -> jobConf.get(type.name()) != null)
-        .forEach(type ->
-            
Splitter.on(ENTRY_SEPARATOR).withKeyValueSeparator(KEY_VALUE_SEPARATOR).split(jobConf.get(type.name())).forEach(
-              (path, cnt) -> tezCounters.findCounter(type.value, 
path).setValue(Long.parseLong(cnt))
-            )
-        );
-    }
-  }
-
-  public static void addAcidMetricsToConfObj(EnumMap<DeltaFilesMetricType,
-      Queue<Pair<String, Integer>>> deltaFilesStats, Configuration conf) {
-    try {
-      deltaFilesStats.forEach((type, value) -> conf
-          .set(type.name(), 
Joiner.on(ENTRY_SEPARATOR).withKeyValueSeparator(KEY_VALUE_SEPARATOR).join(value)));
-
-    } catch (Exception e) {
-      LOG.warn("Couldn't add Delta metrics to conf object", e);
-    }
-  }
-
-  public static void backPropagateAcidMetrics(JobConf jobConf, Configuration 
conf) {
-    if (HiveConf.getBoolVar(jobConf, 
HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED) &&
-      MetastoreConf.getBoolVar(jobConf, 
MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON)) {
-      try {
-        Arrays.stream(DeltaFilesMetricType.values()).filter(type -> 
conf.get(type.name()) != null)
-            .forEach(type -> jobConf.set(type.name(), conf.get(type.name())));
-      } catch (Exception e) {
-        LOG.warn("Couldn't back propagate Delta metrics to jobConf object", e);
-      }
-    }
-  }
 
   private static long getBaseSize(AcidDirectory dir) throws IOException {
     long baseSize = 0;

Review comment:
       What do you think about getting rid of this bit?
   ```
   for (HadoopShims.HdfsFileStatusWithId origStat : dir.getOriginalFiles()) {
           baseSize += origStat.getFileStatus().getLen();
   ```

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -512,7 +284,177 @@ private void shutdown() {
     }
   }
 
-  public static class DeltaFilesMetadata implements Serializable {
-    public String dbName, tableName, partitionName;
+  public static void updateMetricsFromInitiator(AcidDirectory dir, String 
dbName, String tableName, String partitionName,
+      Configuration conf, TxnStore txnHandler) {
+    LOG.debug("Updating delta file metrics from initiator");
+    double deltaPctThreshold = MetastoreConf.getDoubleVar(conf, 
MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_PCT_THRESHOLD);
+    int deltasThreshold = MetastoreConf.getIntVar(conf, 
MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD);
+    int obsoleteDeltasThreshold = MetastoreConf.getIntVar(conf,
+        
MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
+    try {
+      // We have an AcidDir from the initiator, therefore we can use that to 
calculate active,small, obsolete delta
+      // count
+      long baseSize = getBaseSize(dir);
+
+      int numDeltas = dir.getCurrentDirectories().size();
+      int numSmallDeltas = 0;
+
+      for (AcidUtils.ParsedDelta delta : dir.getCurrentDirectories()) {
+        long deltaSize = getDirSize(delta, dir.getFs());
+        if (baseSize != 0 && deltaSize / (float) baseSize < deltaPctThreshold) 
{
+          numSmallDeltas++;
+        }
+      }
+
+      int numObsoleteDeltas = dir.getObsolete().size();
+
+      if (numDeltas > deltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, 
CompactionMetricsData.MetricType.NUM_DELTAS, numDeltas,
+            txnHandler);
+      }
+
+      if (numSmallDeltas > deltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, 
CompactionMetricsData.MetricType.NUM_SMALL_DELTAS,
+            numSmallDeltas, txnHandler);
+      }
+
+      if (numObsoleteDeltas > obsoleteDeltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, 
CompactionMetricsData.MetricType.NUM_OBSOLETE_DELTAS,
+            numObsoleteDeltas, txnHandler);
+      }
+
+      LOG.debug("Finished updating delta file metrics from initiator.\n 
deltaPctThreshold = {}, deltasThreshold = {}, "
+          + "obsoleteDeltasThreshold = {}, numDeltas = {}, numSmallDeltas = 
{},  numObsoleteDeltas = {}",
+          deltaPctThreshold, deltasThreshold, obsoleteDeltasThreshold, 
numDeltas, numSmallDeltas, numObsoleteDeltas);
+
+    } catch (Throwable t) {
+      LOG.warn("Unknown throwable caught while updating delta metrics. Metrics 
will not be updated.", t);
+    }
+  }
+
+  public static void updateMetricsFromWorker(AcidDirectory directory, String 
dbName, String tableName, String partitionName,
+      CompactionType type, Configuration conf, IMetaStoreClient client) {
+    LOG.debug("Updating delta file metrics from worker");
+    int deltasThreshold = MetastoreConf.getIntVar(conf, 
MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD);
+    int obsoleteDeltasThreshold = MetastoreConf.getIntVar(conf,
+        
MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
+    try {
+      // we have an instance of the AcidDirectory before the compaction worker 
was started
+      // from this we can get how many delta directories existed
+      // the previously active delta directories are now moved to obsolete
+      int numObsoleteDeltas = directory.getCurrentDirectories().size();
+      if (numObsoleteDeltas > obsoleteDeltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, 
CompactionMetricsMetricType.NUM_OBSOLETE_DELTAS,
+            numObsoleteDeltas, client);
+      }
+
+      // We don't know the size of the newly create delta directories, that 
would require a fresh AcidDirectory
+      // Clear the small delta num counter from the cache for this key
+      client.removeCompactionMetricsData(dbName, tableName, partitionName, 
CompactionMetricsMetricType.NUM_SMALL_DELTAS);
+
+      // The new number of active delta dirs are either 0, 1 or 2.
+      // If we ran MAJOR compaction, no new delta is created, just base dir
+      // If we ran MINOR compaction, we can have 1 or 2 new delta dirs, 
depending on whether we had deltas or
+      // delete deltas.
+      if (type == CompactionType.MAJOR) {
+        client.removeCompactionMetricsData(dbName, tableName, partitionName, 
CompactionMetricsMetricType.NUM_DELTAS);
+      } else {
+        int numNewDeltas = 0;
+        // check whether we had deltas
+        if (directory.getDeleteDeltas().size() > 0) {
+          numNewDeltas++;
+        }
+
+        // if the size of the current dirs is bigger than the size of delete 
deltas, it means we have active deltas
+        if (directory.getCurrentDirectories().size() > 
directory.getDeleteDeltas().size()) {
+          numNewDeltas++;
+        }
+
+        // recalculate the delta count
+        CompactionMetricsDataStruct prevDelta =
+            client.getCompactionMetricsData(dbName, tableName, partitionName, 
CompactionMetricsMetricType.NUM_DELTAS)
+                .getData();
+        int deltaNum = numNewDeltas;
+        if (prevDelta != null) {
+          deltaNum += prevDelta.getMetricvalue() - 
directory.getCurrentDirectories().size();
+        }
+        if (deltaNum > deltasThreshold) {
+          updateMetrics(dbName, tableName, partitionName, 
CompactionMetricsMetricType.NUM_DELTAS, deltaNum, client);
+        } else {
+          client.removeCompactionMetricsData(dbName, tableName, partitionName, 
CompactionMetricsMetricType.NUM_DELTAS);
+        }
+      }
+
+      LOG.debug("Finished updating delta file metrics from worker.\n 
deltasThreshold = {}, "
+              + "obsoleteDeltasThreshold = {}, numObsoleteDeltas = {}",
+          deltasThreshold, obsoleteDeltasThreshold, numObsoleteDeltas);
+
+    } catch (Throwable t) {
+      LOG.warn("Unknown throwable caught while updating delta metrics. Metrics 
will not be updated.", t);
+    }
+  }
+
+  public static void updateMetricsFromCleaner(String dbName, String tableName, 
String partitionName,
+      int deletedFilesCount, Configuration conf, TxnStore txnHandler) {
+    LOG.debug("Updating delta file metrics from cleaner");
+    int obsoleteDeltasThreshold = MetastoreConf.getIntVar(conf,
+        MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD);
+    try {
+      CompactionMetricsData prevObsoleteDelta =
+          txnHandler.getCompactionMetricsData(dbName, tableName, partitionName,
+              CompactionMetricsData.MetricType.NUM_OBSOLETE_DELTAS);
+      int numObsoleteDeltas = 0;
+      if (prevObsoleteDelta != null) {
+        numObsoleteDeltas = prevObsoleteDelta.getMetricValue() - 
deletedFilesCount;
+        if (numObsoleteDeltas > obsoleteDeltasThreshold) {
+          updateMetrics(dbName, tableName, partitionName, 
CompactionMetricsData.MetricType.NUM_OBSOLETE_DELTAS,
+              numObsoleteDeltas, txnHandler);
+        } else {
+          txnHandler.removeCompactionMetricsData(dbName, tableName, 
partitionName,
+              CompactionMetricsData.MetricType.NUM_OBSOLETE_DELTAS);
+        }
+      }
+
+      LOG.debug("Finished updating delta file metrics from cleaner.\n 
obsoleteDeltasThreshold = {}, "
+              + "numObsoleteDeltas = {}", obsoleteDeltasThreshold, 
numObsoleteDeltas);
+
+    } catch (Throwable t) {
+      LOG.warn("Unknown throwable caught while updating delta metrics. Metrics 
will not be updated.", t);
+    }
+  }
+
+  private static void updateMetrics(String dbName, String tblName, String 
partitionName,
+      CompactionMetricsData.MetricType type, int numDeltas, TxnStore 
txnHandler) throws MetaException {
+    CompactionMetricsData delta = txnHandler.getCompactionMetricsData(dbName, 
tblName, partitionName, type);

Review comment:
       Nit: The name `delta` is a bit confusing

##########
File path: 
standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
##########
@@ -2926,11 +2945,15 @@ PartitionsResponse 
get_partitions_req(1:PartitionsRequest req)
   void mark_cleaned(1:CompactionInfoStruct cr) throws(1:MetaException o1)
   void mark_compacted(1: CompactionInfoStruct cr) throws(1:MetaException o1)
   void mark_failed(1: CompactionInfoStruct cr) throws(1:MetaException o1)
+  CompactionMetricsDataResponse get_compaction_metrics_data(1: string dbName, 
2: string tblName, 3: string partitionName, 4: CompactionMetricsMetricType 
type) throws(1:MetaException o1)
+  bool update_compaction_metrics_data(1: CompactionMetricsDataStruct data, 2: 
i32 version) throws(1:MetaException o1)

Review comment:
       Just a thought - we're inserting data (not updating) iff version == 1, 
otherwise it's an update. Do you think it would make sense to combine 
update_compaction_metrics_data and add_compaction_metrics_data?

##########
File path: 
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
##########
@@ -4112,6 +4126,189 @@ public MetricsInfo getMetricsInfo() throws 
MetaException {
     }
   }
 
+  @Override
+  public CompactionMetricsData getCompactionMetricsData(String dbName, String 
tblName, String partitionName,
+      CompactionMetricsData.MetricType type) throws MetaException {
+    Connection dbConn = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        String query = SELECT_COMPACTION_METRICS_CACHE_QUERY;
+        if (partitionName != null) {
+          query += " AND \"CMC_PARTITION\" = ?";
+        } else {
+          query += " AND \"CMC_PARTITION\" IS NULL";
+        }
+        try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+          pstmt.setString(1, dbName);
+          pstmt.setString(2, tblName);
+          pstmt.setString(3, type.toString());
+          if (partitionName != null) {
+            pstmt.setString(4, partitionName);
+          }
+          ResultSet resultSet = pstmt.executeQuery();
+          CompactionMetricsData.Builder builder = new 
CompactionMetricsData.Builder();
+          if (resultSet.next()) {
+            return 
builder.dbName(dbName).tblName(tblName).partitionName(partitionName).metricType(type)
+                
.metricValue(resultSet.getInt(1)).version(resultSet.getInt(2)).build();
+          } else {
+            return null;
+          }
+        }
+
+      } catch (SQLException e) {
+        LOG.error("Unable to getDeltaMetricsInfo");
+        checkRetryable(e, "getDeltaMetricsInfo");
+        throw new MetaException("Unable to execute getDeltaMetricsInfo()" + 
StringUtils.stringifyException(e));
+      } finally {
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      return getCompactionMetricsData(dbName, tblName, partitionName, type);
+    }
+  }
+
+  @Override
+  public List<CompactionMetricsData> getTopCompactionMetricsDataPerType(int 
limit)
+      throws MetaException {
+    Connection dbConn = null;
+    List<CompactionMetricsData> metricsDataList = new ArrayList<>();
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        for (CompactionMetricsData.MetricType type : 
CompactionMetricsData.MetricType.values()) {
+          String query = sqlGenerator.addLimitClause(limit, 
NO_SELECT_COMPACTION_METRICS_CACHE_FOR_TYPE_QUERY);
+          try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+            pstmt.setString(1, type.toString());
+            ResultSet resultSet = pstmt.executeQuery();
+            while (resultSet.next()) {
+              CompactionMetricsData.Builder builder = new 
CompactionMetricsData.Builder();
+              metricsDataList.add(builder
+                  .dbName(resultSet.getString(1))
+                  .tblName(resultSet.getString(2))
+                  .partitionName(resultSet.getString(3))
+                  .metricType(type)
+                  .metricValue(resultSet.getInt(5))
+                  .version(resultSet.getInt(6))
+                  .build());
+            }
+          }
+        }
+      } catch (SQLException e) {
+        LOG.error("Unable to getCompactionMetricsDataForType");
+        checkRetryable(e, "getCompactionMetricsDataForType");
+        throw new MetaException("Unable to execute 
getCompactionMetricsDataForType()" + stringifyException(e));
+      } finally {
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      return getTopCompactionMetricsDataPerType(limit);
+    }
+    return metricsDataList;
+  }
+
+  @Override
+  public boolean updateCompactionMetricsData(CompactionMetricsData data, int 
version) throws MetaException {
+    Connection dbConn = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        String query = UPDATE_COMPACTION_METRICS_CACHE_QUERY;
+        if (data.getPartitionName() != null) {
+          query += " AND \"CMC_PARTITION\" = ?";
+        } else {
+          query += " AND \"CMC_PARTITION\" IS NULL";
+        }
+        try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+          pstmt.setInt(1, data.getMetricValue());
+          pstmt.setInt(2, data.getVersion());
+          pstmt.setString(3, data.getDbName());
+          pstmt.setString(4, data.getTblName());
+          pstmt.setString(5, data.getMetricType().toString());
+          pstmt.setInt(6, version);
+          if (data.getPartitionName() != null) {
+            pstmt.setString(7, data.getPartitionName());
+          }
+          boolean updateRes = pstmt.executeUpdate() > 0;
+          dbConn.commit();
+          return updateRes;
+        }
+      } catch (SQLException e) {
+        rollbackDBConn(dbConn);
+        checkRetryable(e, "updateCompactionMetricsData(" + data + ", " + 
version + ")");
+        throw new MetaException("Unable to execute 
updateCompactionMetricsData()" + stringifyException(e));

Review comment:
       Same as above (re: throwing exceptions)

##########
File path: 
common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsFactory.java
##########
@@ -34,10 +34,10 @@
   /**
    * Initializes static Metrics instance.
    */
-  public synchronized static void init(HiveConf conf) throws Exception {
+  public synchronized static void init(Configuration conf) throws Exception {

Review comment:
       Why are the changes in this file necessary?

##########
File path: 
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
##########
@@ -4112,6 +4126,189 @@ public MetricsInfo getMetricsInfo() throws 
MetaException {
     }
   }
 
+  @Override
+  public CompactionMetricsData getCompactionMetricsData(String dbName, String 
tblName, String partitionName,
+      CompactionMetricsData.MetricType type) throws MetaException {
+    Connection dbConn = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        String query = SELECT_COMPACTION_METRICS_CACHE_QUERY;
+        if (partitionName != null) {
+          query += " AND \"CMC_PARTITION\" = ?";
+        } else {
+          query += " AND \"CMC_PARTITION\" IS NULL";
+        }
+        try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+          pstmt.setString(1, dbName);
+          pstmt.setString(2, tblName);
+          pstmt.setString(3, type.toString());
+          if (partitionName != null) {
+            pstmt.setString(4, partitionName);
+          }
+          ResultSet resultSet = pstmt.executeQuery();
+          CompactionMetricsData.Builder builder = new 
CompactionMetricsData.Builder();
+          if (resultSet.next()) {
+            return 
builder.dbName(dbName).tblName(tblName).partitionName(partitionName).metricType(type)
+                
.metricValue(resultSet.getInt(1)).version(resultSet.getInt(2)).build();
+          } else {
+            return null;
+          }
+        }
+
+      } catch (SQLException e) {
+        LOG.error("Unable to getDeltaMetricsInfo");
+        checkRetryable(e, "getDeltaMetricsInfo");
+        throw new MetaException("Unable to execute getDeltaMetricsInfo()" + 
StringUtils.stringifyException(e));
+      } finally {
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      return getCompactionMetricsData(dbName, tblName, partitionName, type);
+    }
+  }
+
+  @Override
+  public List<CompactionMetricsData> getTopCompactionMetricsDataPerType(int 
limit)
+      throws MetaException {
+    Connection dbConn = null;
+    List<CompactionMetricsData> metricsDataList = new ArrayList<>();
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        for (CompactionMetricsData.MetricType type : 
CompactionMetricsData.MetricType.values()) {
+          String query = sqlGenerator.addLimitClause(limit, 
NO_SELECT_COMPACTION_METRICS_CACHE_FOR_TYPE_QUERY);
+          try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+            pstmt.setString(1, type.toString());
+            ResultSet resultSet = pstmt.executeQuery();
+            while (resultSet.next()) {
+              CompactionMetricsData.Builder builder = new 
CompactionMetricsData.Builder();
+              metricsDataList.add(builder
+                  .dbName(resultSet.getString(1))
+                  .tblName(resultSet.getString(2))
+                  .partitionName(resultSet.getString(3))
+                  .metricType(type)
+                  .metricValue(resultSet.getInt(5))
+                  .version(resultSet.getInt(6))
+                  .build());
+            }
+          }
+        }
+      } catch (SQLException e) {
+        LOG.error("Unable to getCompactionMetricsDataForType");
+        checkRetryable(e, "getCompactionMetricsDataForType");
+        throw new MetaException("Unable to execute 
getCompactionMetricsDataForType()" + stringifyException(e));
+      } finally {
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      return getTopCompactionMetricsDataPerType(limit);
+    }
+    return metricsDataList;
+  }
+
+  @Override
+  public boolean updateCompactionMetricsData(CompactionMetricsData data, int 
version) throws MetaException {
+    Connection dbConn = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        String query = UPDATE_COMPACTION_METRICS_CACHE_QUERY;
+        if (data.getPartitionName() != null) {
+          query += " AND \"CMC_PARTITION\" = ?";
+        } else {
+          query += " AND \"CMC_PARTITION\" IS NULL";
+        }
+        try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+          pstmt.setInt(1, data.getMetricValue());
+          pstmt.setInt(2, data.getVersion());
+          pstmt.setString(3, data.getDbName());
+          pstmt.setString(4, data.getTblName());
+          pstmt.setString(5, data.getMetricType().toString());
+          pstmt.setInt(6, version);
+          if (data.getPartitionName() != null) {
+            pstmt.setString(7, data.getPartitionName());
+          }
+          boolean updateRes = pstmt.executeUpdate() > 0;
+          dbConn.commit();
+          return updateRes;
+        }
+      } catch (SQLException e) {
+        rollbackDBConn(dbConn);
+        checkRetryable(e, "updateCompactionMetricsData(" + data + ", " + 
version + ")");
+        throw new MetaException("Unable to execute 
updateCompactionMetricsData()" + stringifyException(e));
+      } finally {
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      updateCompactionMetricsData(data, version);
+    }
+    return true;
+  }
+
+  @Override
+  public void addCompactionMetricsData(CompactionMetricsData data) throws 
MetaException {
+    Connection dbConn = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        try (PreparedStatement pstmt = 
dbConn.prepareStatement(INSERT_COMPACTION_METRICS_CACHE_QUERY)) {
+          pstmt.setString(1, data.getDbName());
+          pstmt.setString(2, data.getTblName());
+          pstmt.setString(3, data.getPartitionName());

Review comment:
       Might be setting partition name to the string "null" – is this what we 
want?

##########
File path: 
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
##########
@@ -4112,6 +4126,189 @@ public MetricsInfo getMetricsInfo() throws 
MetaException {
     }
   }
 
+  @Override
+  public CompactionMetricsData getCompactionMetricsData(String dbName, String 
tblName, String partitionName,
+      CompactionMetricsData.MetricType type) throws MetaException {
+    Connection dbConn = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        String query = SELECT_COMPACTION_METRICS_CACHE_QUERY;
+        if (partitionName != null) {
+          query += " AND \"CMC_PARTITION\" = ?";
+        } else {
+          query += " AND \"CMC_PARTITION\" IS NULL";
+        }
+        try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+          pstmt.setString(1, dbName);
+          pstmt.setString(2, tblName);
+          pstmt.setString(3, type.toString());
+          if (partitionName != null) {
+            pstmt.setString(4, partitionName);
+          }
+          ResultSet resultSet = pstmt.executeQuery();
+          CompactionMetricsData.Builder builder = new 
CompactionMetricsData.Builder();
+          if (resultSet.next()) {
+            return 
builder.dbName(dbName).tblName(tblName).partitionName(partitionName).metricType(type)
+                
.metricValue(resultSet.getInt(1)).version(resultSet.getInt(2)).build();
+          } else {
+            return null;
+          }
+        }
+
+      } catch (SQLException e) {
+        LOG.error("Unable to getDeltaMetricsInfo");
+        checkRetryable(e, "getDeltaMetricsInfo");
+        throw new MetaException("Unable to execute getDeltaMetricsInfo()" + 
StringUtils.stringifyException(e));
+      } finally {
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      return getCompactionMetricsData(dbName, tblName, partitionName, type);
+    }
+  }
+
+  @Override
+  public List<CompactionMetricsData> getTopCompactionMetricsDataPerType(int 
limit)
+      throws MetaException {
+    Connection dbConn = null;
+    List<CompactionMetricsData> metricsDataList = new ArrayList<>();
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        for (CompactionMetricsData.MetricType type : 
CompactionMetricsData.MetricType.values()) {
+          String query = sqlGenerator.addLimitClause(limit, 
NO_SELECT_COMPACTION_METRICS_CACHE_FOR_TYPE_QUERY);
+          try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+            pstmt.setString(1, type.toString());
+            ResultSet resultSet = pstmt.executeQuery();
+            while (resultSet.next()) {
+              CompactionMetricsData.Builder builder = new 
CompactionMetricsData.Builder();
+              metricsDataList.add(builder
+                  .dbName(resultSet.getString(1))
+                  .tblName(resultSet.getString(2))
+                  .partitionName(resultSet.getString(3))
+                  .metricType(type)
+                  .metricValue(resultSet.getInt(5))
+                  .version(resultSet.getInt(6))
+                  .build());
+            }
+          }
+        }
+      } catch (SQLException e) {
+        LOG.error("Unable to getCompactionMetricsDataForType");
+        checkRetryable(e, "getCompactionMetricsDataForType");
+        throw new MetaException("Unable to execute 
getCompactionMetricsDataForType()" + stringifyException(e));
+      } finally {
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      return getTopCompactionMetricsDataPerType(limit);
+    }
+    return metricsDataList;
+  }
+
+  @Override
+  public boolean updateCompactionMetricsData(CompactionMetricsData data, int 
version) throws MetaException {
+    Connection dbConn = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        String query = UPDATE_COMPACTION_METRICS_CACHE_QUERY;
+        if (data.getPartitionName() != null) {
+          query += " AND \"CMC_PARTITION\" = ?";
+        } else {
+          query += " AND \"CMC_PARTITION\" IS NULL";
+        }
+        try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+          pstmt.setInt(1, data.getMetricValue());
+          pstmt.setInt(2, data.getVersion());
+          pstmt.setString(3, data.getDbName());
+          pstmt.setString(4, data.getTblName());
+          pstmt.setString(5, data.getMetricType().toString());
+          pstmt.setInt(6, version);
+          if (data.getPartitionName() != null) {
+            pstmt.setString(7, data.getPartitionName());
+          }
+          boolean updateRes = pstmt.executeUpdate() > 0;
+          dbConn.commit();
+          return updateRes;
+        }
+      } catch (SQLException e) {
+        rollbackDBConn(dbConn);
+        checkRetryable(e, "updateCompactionMetricsData(" + data + ", " + 
version + ")");
+        throw new MetaException("Unable to execute 
updateCompactionMetricsData()" + stringifyException(e));
+      } finally {
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      updateCompactionMetricsData(data, version);
+    }
+    return true;
+  }
+
+  @Override
+  public void addCompactionMetricsData(CompactionMetricsData data) throws 
MetaException {
+    Connection dbConn = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        try (PreparedStatement pstmt = 
dbConn.prepareStatement(INSERT_COMPACTION_METRICS_CACHE_QUERY)) {
+          pstmt.setString(1, data.getDbName());
+          pstmt.setString(2, data.getTblName());
+          pstmt.setString(3, data.getPartitionName());
+          pstmt.setString(4, data.getMetricType().toString());
+          pstmt.setInt(5, data.getMetricValue());
+          pstmt.setInt(6, data.getVersion());
+          pstmt.executeUpdate();
+          dbConn.commit();
+        }
+      } catch (SQLException e) {
+        rollbackDBConn(dbConn);
+        checkRetryable(e, "addCompactionMetricsData(" + data + ")");
+        throw new MetaException("Unable to execute addCompactionMetricsData()" 
+ stringifyException(e));

Review comment:
       Same as above (re: throwing exceptions)

##########
File path: 
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
##########
@@ -4112,6 +4126,189 @@ public MetricsInfo getMetricsInfo() throws 
MetaException {
     }
   }
 
+  @Override
+  public CompactionMetricsData getCompactionMetricsData(String dbName, String 
tblName, String partitionName,
+      CompactionMetricsData.MetricType type) throws MetaException {
+    Connection dbConn = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        String query = SELECT_COMPACTION_METRICS_CACHE_QUERY;
+        if (partitionName != null) {
+          query += " AND \"CMC_PARTITION\" = ?";
+        } else {
+          query += " AND \"CMC_PARTITION\" IS NULL";
+        }
+        try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+          pstmt.setString(1, dbName);
+          pstmt.setString(2, tblName);
+          pstmt.setString(3, type.toString());
+          if (partitionName != null) {
+            pstmt.setString(4, partitionName);
+          }
+          ResultSet resultSet = pstmt.executeQuery();
+          CompactionMetricsData.Builder builder = new 
CompactionMetricsData.Builder();
+          if (resultSet.next()) {
+            return 
builder.dbName(dbName).tblName(tblName).partitionName(partitionName).metricType(type)
+                
.metricValue(resultSet.getInt(1)).version(resultSet.getInt(2)).build();
+          } else {
+            return null;
+          }
+        }
+
+      } catch (SQLException e) {
+        LOG.error("Unable to getDeltaMetricsInfo");
+        checkRetryable(e, "getDeltaMetricsInfo");
+        throw new MetaException("Unable to execute getDeltaMetricsInfo()" + 
StringUtils.stringifyException(e));
+      } finally {
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      return getCompactionMetricsData(dbName, tblName, partitionName, type);
+    }
+  }
+
+  @Override
+  public List<CompactionMetricsData> getTopCompactionMetricsDataPerType(int 
limit)
+      throws MetaException {
+    Connection dbConn = null;
+    List<CompactionMetricsData> metricsDataList = new ArrayList<>();
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        for (CompactionMetricsData.MetricType type : 
CompactionMetricsData.MetricType.values()) {
+          String query = sqlGenerator.addLimitClause(limit, 
NO_SELECT_COMPACTION_METRICS_CACHE_FOR_TYPE_QUERY);
+          try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+            pstmt.setString(1, type.toString());
+            ResultSet resultSet = pstmt.executeQuery();
+            while (resultSet.next()) {
+              CompactionMetricsData.Builder builder = new 
CompactionMetricsData.Builder();
+              metricsDataList.add(builder
+                  .dbName(resultSet.getString(1))
+                  .tblName(resultSet.getString(2))
+                  .partitionName(resultSet.getString(3))
+                  .metricType(type)
+                  .metricValue(resultSet.getInt(5))
+                  .version(resultSet.getInt(6))
+                  .build());
+            }
+          }
+        }
+      } catch (SQLException e) {
+        LOG.error("Unable to getCompactionMetricsDataForType");
+        checkRetryable(e, "getCompactionMetricsDataForType");
+        throw new MetaException("Unable to execute 
getCompactionMetricsDataForType()" + stringifyException(e));
+      } finally {
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      return getTopCompactionMetricsDataPerType(limit);
+    }
+    return metricsDataList;
+  }
+
+  @Override
+  public boolean updateCompactionMetricsData(CompactionMetricsData data, int 
version) throws MetaException {
+    Connection dbConn = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        String query = UPDATE_COMPACTION_METRICS_CACHE_QUERY;
+        if (data.getPartitionName() != null) {
+          query += " AND \"CMC_PARTITION\" = ?";
+        } else {
+          query += " AND \"CMC_PARTITION\" IS NULL";
+        }
+        try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+          pstmt.setInt(1, data.getMetricValue());
+          pstmt.setInt(2, data.getVersion());
+          pstmt.setString(3, data.getDbName());
+          pstmt.setString(4, data.getTblName());
+          pstmt.setString(5, data.getMetricType().toString());
+          pstmt.setInt(6, version);
+          if (data.getPartitionName() != null) {
+            pstmt.setString(7, data.getPartitionName());
+          }
+          boolean updateRes = pstmt.executeUpdate() > 0;
+          dbConn.commit();
+          return updateRes;
+        }
+      } catch (SQLException e) {
+        rollbackDBConn(dbConn);
+        checkRetryable(e, "updateCompactionMetricsData(" + data + ", " + 
version + ")");
+        throw new MetaException("Unable to execute 
updateCompactionMetricsData()" + stringifyException(e));
+      } finally {
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      updateCompactionMetricsData(data, version);
+    }
+    return true;
+  }
+
+  @Override
+  public void addCompactionMetricsData(CompactionMetricsData data) throws 
MetaException {
+    Connection dbConn = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        try (PreparedStatement pstmt = 
dbConn.prepareStatement(INSERT_COMPACTION_METRICS_CACHE_QUERY)) {
+          pstmt.setString(1, data.getDbName());
+          pstmt.setString(2, data.getTblName());
+          pstmt.setString(3, data.getPartitionName());
+          pstmt.setString(4, data.getMetricType().toString());
+          pstmt.setInt(5, data.getMetricValue());
+          pstmt.setInt(6, data.getVersion());
+          pstmt.executeUpdate();
+          dbConn.commit();
+        }
+      } catch (SQLException e) {
+        rollbackDBConn(dbConn);
+        checkRetryable(e, "addCompactionMetricsData(" + data + ")");
+        throw new MetaException("Unable to execute addCompactionMetricsData()" 
+ stringifyException(e));
+      } finally {
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      addCompactionMetricsData(data);
+    }
+  }
+
+  public void removeCompactionMetricsData(String dbName, String tblName, 
String partitionName,
+      CompactionMetricsData.MetricType type) throws MetaException {
+    Connection dbConn = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        String query = DELETE_COMPACTION_METRICS_CACHE_QUERY;
+        if (partitionName != null) {
+          query += " AND \"CMC_PARTITION\" = ?";
+        } else {
+          query += " AND \"CMC_PARTITION\" IS NULL";
+        }
+        try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+          pstmt.setString(1, dbName);
+          pstmt.setString(2, tblName);
+          pstmt.setString(3, type.toString());
+          if (partitionName != null) {
+            pstmt.setString(4, partitionName);
+          }
+          pstmt.executeUpdate();
+          dbConn.commit();
+        }
+      } catch (SQLException e) {
+        rollbackDBConn(dbConn);
+        checkRetryable(e, "removeCompactionMetricsData(" + dbName + ", " +  
tblName + ", " + partitionName + ", " +
+            type + ")");
+        throw new MetaException("Unable to execute 
removeCompactionMetricsData()" + stringifyException(e));

Review comment:
       Same as above (re: throwing exceptions)

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -398,13 +156,6 @@ private static long getBaseSize(AcidDirectory dir) throws 
IOException {
     return baseSize;
   }
 
-  private static long getModificationTime(AcidUtils.ParsedDirectory dir, 
FileSystem fs) throws IOException {
-    return dir.getFiles(fs, Ref.from(false)).stream()
-      .map(HadoopShims.HdfsFileStatusWithId::getFileStatus)
-      .mapToLong(FileStatus::getModificationTime)
-      .max()
-      .orElse(new Date().getTime());
-  }
 
   private static long getDirSize(AcidUtils.ParsedDirectory dir, FileSystem fs) 
throws IOException {

Review comment:
       I'm a bit concerned about this method. Do you think it might slow down 
the Initiator/Worker/Cleaner?

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -34,99 +26,60 @@
 import org.apache.hadoop.hive.common.metrics.common.Metrics;
 import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.CompactionMetricsDataStruct;
+import org.apache.hadoop.hive.metastore.api.CompactionMetricsMetricType;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
-import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.metastore.txn.CompactionMetricsData;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.ql.io.AcidDirectory;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
-
 import org.apache.hadoop.hive.shims.HadoopShims;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hive.common.util.Ref;
-import org.apache.tez.common.counters.CounterGroup;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.common.counters.TezCounters;
+import org.apache.thrift.TException;
 import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 import java.io.IOException;
-import java.io.Serializable;
 import java.lang.management.ManagementFactory;
-import java.util.Arrays;
 import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.EnumMap;
-import java.util.HashMap;
 import java.util.List;
-import java.util.PriorityQueue;
-import java.util.Queue;
-
-import java.util.Set;
+import java.util.Map;
 import java.util.concurrent.Executors;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import static 
org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTION_NUM_DELTAS;
 import static 
org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTION_NUM_OBSOLETE_DELTAS;
 import static 
org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTION_NUM_SMALL_DELTAS;
 
-import static 
org.apache.hadoop.hive.ql.txn.compactor.metrics.DeltaFilesMetricReporter.DeltaFilesMetricType.NUM_DELTAS;
-import static 
org.apache.hadoop.hive.ql.txn.compactor.metrics.DeltaFilesMetricReporter.DeltaFilesMetricType.NUM_OBSOLETE_DELTAS;
-import static 
org.apache.hadoop.hive.ql.txn.compactor.metrics.DeltaFilesMetricReporter.DeltaFilesMetricType.NUM_SMALL_DELTAS;
-
 /**
  * Collects and publishes ACID compaction related metrics.
- * Everything should be behind 2 feature flags: {@link 
HiveConf.ConfVars#HIVE_SERVER2_METRICS_ENABLED} and
+ * Everything should be behind 2 feature flags: {@link 
MetastoreConf.ConfVars#METRICS_ENABLED} and
  * {@link MetastoreConf.ConfVars#METASTORE_ACIDMETRICS_EXT_ON}.
- * First we store the information in the jobConf, then in Tez Counters, then 
in a cache stored here, then in a custom
- * MBean.
+ * First we store the information in the HMS backend DB 
COMPACTION_METRICS_CACHE table, then in a custom MBean.

Review comment:
       Maybe also mention that these values are logged...

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -512,7 +284,177 @@ private void shutdown() {
     }
   }
 
-  public static class DeltaFilesMetadata implements Serializable {
-    public String dbName, tableName, partitionName;
+  public static void updateMetricsFromInitiator(AcidDirectory dir, String 
dbName, String tableName, String partitionName,
+      Configuration conf, TxnStore txnHandler) {
+    LOG.debug("Updating delta file metrics from initiator");
+    double deltaPctThreshold = MetastoreConf.getDoubleVar(conf, 
MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_PCT_THRESHOLD);
+    int deltasThreshold = MetastoreConf.getIntVar(conf, 
MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD);
+    int obsoleteDeltasThreshold = MetastoreConf.getIntVar(conf,
+        
MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
+    try {
+      // We have an AcidDir from the initiator, therefore we can use that to 
calculate active,small, obsolete delta
+      // count
+      long baseSize = getBaseSize(dir);
+
+      int numDeltas = dir.getCurrentDirectories().size();
+      int numSmallDeltas = 0;
+
+      for (AcidUtils.ParsedDelta delta : dir.getCurrentDirectories()) {
+        long deltaSize = getDirSize(delta, dir.getFs());
+        if (baseSize != 0 && deltaSize / (float) baseSize < deltaPctThreshold) 
{
+          numSmallDeltas++;
+        }
+      }
+
+      int numObsoleteDeltas = dir.getObsolete().size();
+
+      if (numDeltas > deltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, 
CompactionMetricsData.MetricType.NUM_DELTAS, numDeltas,
+            txnHandler);
+      }
+
+      if (numSmallDeltas > deltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, 
CompactionMetricsData.MetricType.NUM_SMALL_DELTAS,
+            numSmallDeltas, txnHandler);
+      }
+
+      if (numObsoleteDeltas > obsoleteDeltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, 
CompactionMetricsData.MetricType.NUM_OBSOLETE_DELTAS,
+            numObsoleteDeltas, txnHandler);
+      }
+
+      LOG.debug("Finished updating delta file metrics from initiator.\n 
deltaPctThreshold = {}, deltasThreshold = {}, "
+          + "obsoleteDeltasThreshold = {}, numDeltas = {}, numSmallDeltas = 
{},  numObsoleteDeltas = {}",
+          deltaPctThreshold, deltasThreshold, obsoleteDeltasThreshold, 
numDeltas, numSmallDeltas, numObsoleteDeltas);
+
+    } catch (Throwable t) {
+      LOG.warn("Unknown throwable caught while updating delta metrics. Metrics 
will not be updated.", t);
+    }
+  }
+
+  public static void updateMetricsFromWorker(AcidDirectory directory, String 
dbName, String tableName, String partitionName,
+      CompactionType type, Configuration conf, IMetaStoreClient client) {
+    LOG.debug("Updating delta file metrics from worker");
+    int deltasThreshold = MetastoreConf.getIntVar(conf, 
MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD);
+    int obsoleteDeltasThreshold = MetastoreConf.getIntVar(conf,
+        
MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
+    try {
+      // we have an instance of the AcidDirectory before the compaction worker 
was started
+      // from this we can get how many delta directories existed
+      // the previously active delta directories are now moved to obsolete
+      int numObsoleteDeltas = directory.getCurrentDirectories().size();
+      if (numObsoleteDeltas > obsoleteDeltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, 
CompactionMetricsMetricType.NUM_OBSOLETE_DELTAS,
+            numObsoleteDeltas, client);
+      }
+
+      // We don't know the size of the newly create delta directories, that 
would require a fresh AcidDirectory
+      // Clear the small delta num counter from the cache for this key
+      client.removeCompactionMetricsData(dbName, tableName, partitionName, 
CompactionMetricsMetricType.NUM_SMALL_DELTAS);
+
+      // The new number of active delta dirs are either 0, 1 or 2.
+      // If we ran MAJOR compaction, no new delta is created, just base dir
+      // If we ran MINOR compaction, we can have 1 or 2 new delta dirs, 
depending on whether we had deltas or
+      // delete deltas.
+      if (type == CompactionType.MAJOR) {
+        client.removeCompactionMetricsData(dbName, tableName, partitionName, 
CompactionMetricsMetricType.NUM_DELTAS);
+      } else {
+        int numNewDeltas = 0;
+        // check whether we had deltas
+        if (directory.getDeleteDeltas().size() > 0) {
+          numNewDeltas++;
+        }
+
+        // if the size of the current dirs is bigger than the size of delete 
deltas, it means we have active deltas
+        if (directory.getCurrentDirectories().size() > 
directory.getDeleteDeltas().size()) {
+          numNewDeltas++;
+        }
+
+        // recalculate the delta count
+        CompactionMetricsDataStruct prevDelta =
+            client.getCompactionMetricsData(dbName, tableName, partitionName, 
CompactionMetricsMetricType.NUM_DELTAS)
+                .getData();
+        int deltaNum = numNewDeltas;
+        if (prevDelta != null) {
+          deltaNum += prevDelta.getMetricvalue() - 
directory.getCurrentDirectories().size();

Review comment:
       There is a high chance that directory.getCurrentDirectories().size() > 
prevDelta.getMetricvalue() , i.e. more deltas were inserted since the initiator 
queued this compaction... in this case deltaNum shouldn't be changed

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -512,7 +284,177 @@ private void shutdown() {
     }
   }
 
-  public static class DeltaFilesMetadata implements Serializable {
-    public String dbName, tableName, partitionName;
+  public static void updateMetricsFromInitiator(AcidDirectory dir, String 
dbName, String tableName, String partitionName,
+      Configuration conf, TxnStore txnHandler) {
+    LOG.debug("Updating delta file metrics from initiator");
+    double deltaPctThreshold = MetastoreConf.getDoubleVar(conf, 
MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_PCT_THRESHOLD);
+    int deltasThreshold = MetastoreConf.getIntVar(conf, 
MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD);
+    int obsoleteDeltasThreshold = MetastoreConf.getIntVar(conf,
+        
MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
+    try {
+      // We have an AcidDir from the initiator, therefore we can use that to 
calculate active,small, obsolete delta
+      // count
+      long baseSize = getBaseSize(dir);
+
+      int numDeltas = dir.getCurrentDirectories().size();
+      int numSmallDeltas = 0;
+
+      for (AcidUtils.ParsedDelta delta : dir.getCurrentDirectories()) {
+        long deltaSize = getDirSize(delta, dir.getFs());
+        if (baseSize != 0 && deltaSize / (float) baseSize < deltaPctThreshold) 
{
+          numSmallDeltas++;
+        }
+      }
+
+      int numObsoleteDeltas = dir.getObsolete().size();

Review comment:
       Should probably be:
   int numObsoleteDeltas = dir.getObsolete().size() + 
dir.getAbortedDirectories.size()
   
   And the Cleaner's update should also subtract aborted directories.
   
   I'm open to discuss this, though

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -396,7 +423,9 @@ private boolean removeFiles(String location, 
ValidWriteIdList writeIdList, Compa
     }
     StringBuilder extraDebugInfo = new 
StringBuilder("[").append(obsoleteDirs.stream()
         .map(Path::getName).collect(Collectors.joining(",")));
-    return remove(location, ci, obsoleteDirs, true, fs, extraDebugInfo);
+    boolean success = remove(location, ci, obsoleteDirs, true, fs, 
extraDebugInfo);
+    updateDeltaFilesMetrics(ci.dbname, ci.tableName, ci.partName, 
dir.getObsolete().size());
+    return success;

Review comment:
       - Base directories and original files may be in the list of obsolete 
files the Cleaner deletes. Do you think it would be worth filtering the 
obsolete/aborted files’ names for “delta”?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -396,7 +423,9 @@ private boolean removeFiles(String location, 
ValidWriteIdList writeIdList, Compa
     }
     StringBuilder extraDebugInfo = new 
StringBuilder("[").append(obsoleteDirs.stream()
         .map(Path::getName).collect(Collectors.joining(",")));
-    return remove(location, ci, obsoleteDirs, true, fs, extraDebugInfo);
+    boolean success = remove(location, ci, obsoleteDirs, true, fs, 
extraDebugInfo);
+    updateDeltaFilesMetrics(ci.dbname, ci.tableName, ci.partName, 
dir.getObsolete().size());

Review comment:
       should probably be only updated if success == true (I believe this means 
that at least 1 dir was removed)

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -139,157 +92,37 @@ public static DeltaFilesMetricReporter getInstance() {
     return InstanceHolder.instance;
   }
 
-  public static synchronized void init(HiveConf conf) throws Exception {
-    getInstance().configure(conf);
+  public static synchronized void init(Configuration conf, TxnStore 
txnHandler) throws Exception {
+    if (!initialized) {
+      getInstance().configure(conf, txnHandler);
+      initialized = true;
+    }
   }
 
-  private void configure(HiveConf conf) throws Exception {
+  private void configure(Configuration conf, TxnStore txnHandler) throws 
Exception {
     long reportingInterval =
-        HiveConf.getTimeVar(conf, 
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL, TimeUnit.SECONDS);
-    hiveEntitySeparator = conf.getVar(HiveConf.ConfVars.HIVE_ENTITY_SEPARATOR);
+        MetastoreConf.getTimeVar(conf, 
MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_REPORTING_INTERVAL, 
TimeUnit.SECONDS);
+
+    maxCacheSize = MetastoreConf.getIntVar(conf, 
MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_MAX_CACHE_SIZE);
 
-    initCachesForMetrics(conf);
     initObjectsForMetrics();
 
     ThreadFactory threadFactory =
         new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("DeltaFilesMetricReporter 
%d").build();
-    executorService = 
Executors.newSingleThreadScheduledExecutor(threadFactory);
-    executorService.scheduleAtFixedRate(new ReportingTask(), 0, 
reportingInterval, TimeUnit.SECONDS);
+    reporterExecutorService = 
Executors.newSingleThreadScheduledExecutor(threadFactory);
+    reporterExecutorService.scheduleAtFixedRate(new ReportingTask(txnHandler), 
0, reportingInterval, TimeUnit.SECONDS);
 
     LOG.info("Started DeltaFilesMetricReporter thread");

Review comment:
       This is a duplicate log line

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -396,7 +423,9 @@ private boolean removeFiles(String location, 
ValidWriteIdList writeIdList, Compa
     }
     StringBuilder extraDebugInfo = new 
StringBuilder("[").append(obsoleteDirs.stream()
         .map(Path::getName).collect(Collectors.joining(",")));
-    return remove(location, ci, obsoleteDirs, true, fs, extraDebugInfo);
+    boolean success = remove(location, ci, obsoleteDirs, true, fs, 
extraDebugInfo);
+    updateDeltaFilesMetrics(ci.dbname, ci.tableName, ci.partName, 
dir.getObsolete().size());

Review comment:
       More readable solution : Update only if dir.getObsolete().size() > 0 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to