This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new bfdaeab  HIVE-23624: Add metastore metrics to show the compaction 
status (#1064) (Peter Vary reviewed by Laszlo Pinter)
bfdaeab is described below

commit bfdaeabb85ec4d91a816f50a44927e2e17cb1b5b
Author: pvary <pv...@cloudera.com>
AuthorDate: Tue Jun 9 09:58:35 2020 +0200

    HIVE-23624: Add metastore metrics to show the compaction status (#1064) 
(Peter Vary reviewed by Laszlo Pinter)
    
    HIVE-23624: Add metastore metrics to show the compaction status (#1064)  
(Peter Vary reviewed by Laszlo Pinter)
---
 .../hive/ql/txn/compactor/CompactorThread.java     |   1 -
 .../hadoop/hive/ql/txn/compactor/Initiator.java    |  38 +++++-
 .../hive/ql/txn/compactor/TestInitiator.java       | 148 +++++++++++++++++++++
 .../hive/metastore/metrics/MetricsConstants.java   |   1 +
 .../apache/hadoop/hive/metastore/txn/TxnStore.java |   3 +
 5 files changed, 189 insertions(+), 2 deletions(-)

diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
index fb23c2f..1b0af0e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.RawStore;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Partition;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 7913295..5b2c937 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.ql.txn.compactor;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -39,6 +40,8 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.metrics.Metrics;
+import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
@@ -92,6 +95,7 @@ public class Initiator extends MetaStoreCompactorThread {
       long abortedTimeThreshold = HiveConf
           .getTimeVar(conf, 
HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD,
               TimeUnit.MILLISECONDS);
+      boolean metricsEnabled = MetastoreConf.getBoolVar(conf, 
MetastoreConf.ConfVars.METRICS_ENABLED);
 
       // Make sure we run through the loop once before checking to stop as 
this makes testing
       // much easier.  The stop value is only for testing anyway and not used 
when called from
@@ -109,9 +113,13 @@ public class Initiator extends MetaStoreCompactorThread {
           long compactionInterval = (prevStart < 0) ? prevStart : (startedAt - 
prevStart)/1000;
           prevStart = startedAt;
 
-          //todo: add method to only get current i.e. skip history - more 
efficient
           ShowCompactResponse currentCompactions = txnHandler.showCompact(new 
ShowCompactRequest());
 
+          if (metricsEnabled) {
+            // Update compaction metrics based on showCompactions result
+            updateCompactionMetrics(currentCompactions);
+          }
+
           Set<CompactionInfo> potentials = 
txnHandler.findPotentialCompactions(abortedThreshold,
               abortedTimeThreshold, compactionInterval)
               .stream()
@@ -487,4 +495,32 @@ public class Initiator extends MetaStoreCompactorThread {
     }
     return true;
   }
+
+  @VisibleForTesting
+  protected static void updateCompactionMetrics(ShowCompactResponse 
showCompactResponse) {
+    Map<String, ShowCompactResponseElement> lastElements = new HashMap<>();
+
+    // Get the last compaction for each db/table/partition
+    for(ShowCompactResponseElement element : 
showCompactResponse.getCompacts()) {
+      String key = element.getDbname() + "/" + element.getTablename() +
+          (element.getPartitionname() != null ? "/" + 
element.getPartitionname() : "");
+      // If new key, add the element, if there is an existing one, change to 
the element if the element.id is greater than old.id
+      lastElements.compute(key, (k, old) -> (old == null) ? element : 
(element.getId() > old.getId() ? element : old));
+    }
+
+    // Get the current count for each state
+    Map<String, Long> counts = lastElements.values().stream()
+        .collect(Collectors.groupingBy(e -> e.getState(), 
Collectors.counting()));
+
+    // Update metrics
+    for (int i = 0; i < TxnStore.COMPACTION_STATES.length; ++i) {
+      String key = MetricsConstants.COMPACTION_STATUS_PREFIX + 
TxnStore.COMPACTION_STATES[i];
+      Long count = counts.get(TxnStore.COMPACTION_STATES[i]);
+      if (count != null) {
+        Metrics.getOrCreateGauge(key).set(count.intValue());
+      } else {
+        Metrics.getOrCreateGauge(key).set(0);
+      }
+    }
+  }
 }
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
index 279de19..f3ab5ce 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
@@ -34,6 +34,10 @@ import 
org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
 import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
 import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.metrics.Metrics;
+import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
@@ -50,6 +54,7 @@ import java.util.concurrent.TimeUnit;
  * Tests for the compactor Initiator thread.
  */
 public class TestInitiator extends CompactorTest {
+  private final String INITIATED_METRICS_KEY = 
MetricsConstants.COMPACTION_STATUS_PREFIX + TxnStore.INITIATED_RESPONSE;
 
   @Test
   public void nothing() throws Exception {
@@ -817,6 +822,149 @@ public class TestInitiator extends CompactorTest {
     Assert.assertEquals(10, compacts.size());
   }
 
+  @Test
+  public void testInitiatorMetricsEnabled() throws Exception {
+    MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED, 
true);
+    Metrics.initialize(conf);
+    int originalValue = 
Metrics.getOrCreateGauge(INITIATED_METRICS_KEY).intValue();
+    Table t = newTable("default", "ime", true);
+    List<LockComponent> components = new ArrayList<>();
+
+    for (int i = 0; i < 10; i++) {
+      Partition p = newPartition(t, "part" + (i + 1));
+      addBaseFile(t, p, 20L, 20);
+      addDeltaFile(t, p, 21L, 22L, 2);
+      addDeltaFile(t, p, 23L, 24L, 2);
+
+      LockComponent comp = new LockComponent(LockType.SHARED_WRITE, 
LockLevel.PARTITION, "default");
+      comp.setTablename("ime");
+      comp.setPartitionname("ds=part" + (i + 1));
+      comp.setOperationType(DataOperationType.UPDATE);
+      components.add(comp);
+    }
+    burnThroughTransactions("default", "ime", 23);
+    long txnid = openTxn();
+
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(txnid);
+    LockResponse res = txnHandler.lock(req);
+    Assert.assertEquals(LockState.ACQUIRED, res.getState());
+
+    long writeid = allocateWriteId("default", "ime", txnid);
+    Assert.assertEquals(24, writeid);
+    txnHandler.commitTxn(new CommitTxnRequest(txnid));
+
+    startInitiator();
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals(10, compacts.size());
+
+    // The metrics will appear after the next Initiator run
+    startInitiator();
+
+    Assert.assertEquals(originalValue + 10,
+        Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_STATUS_PREFIX + 
TxnStore.INITIATED_RESPONSE).intValue());
+  }
+
+  @Test
+  public void testInitiatorMetricsDisabled() throws Exception {
+    MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED, 
false);
+    Metrics.initialize(conf);
+    int originalValue = 
Metrics.getOrCreateGauge(INITIATED_METRICS_KEY).intValue();
+    Table t = newTable("default", "imd", true);
+    List<LockComponent> components = new ArrayList<>();
+
+    for (int i = 0; i < 10; i++) {
+      Partition p = newPartition(t, "part" + (i + 1));
+      addBaseFile(t, p, 20L, 20);
+      addDeltaFile(t, p, 21L, 22L, 2);
+      addDeltaFile(t, p, 23L, 24L, 2);
+
+      LockComponent comp = new LockComponent(LockType.SHARED_WRITE, 
LockLevel.PARTITION, "default");
+      comp.setTablename("imd");
+      comp.setPartitionname("ds=part" + (i + 1));
+      comp.setOperationType(DataOperationType.UPDATE);
+      components.add(comp);
+    }
+    burnThroughTransactions("default", "imd", 23);
+    long txnid = openTxn();
+
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(txnid);
+    LockResponse res = txnHandler.lock(req);
+    Assert.assertEquals(LockState.ACQUIRED, res.getState());
+
+    long writeid = allocateWriteId("default", "imd", txnid);
+    Assert.assertEquals(24, writeid);
+    txnHandler.commitTxn(new CommitTxnRequest(txnid));
+
+    startInitiator();
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals(10, compacts.size());
+
+    // The metrics will appear after the next Initiator run
+    startInitiator();
+
+    Assert.assertEquals(originalValue,
+        Metrics.getOrCreateGauge(INITIATED_METRICS_KEY).intValue());
+  }
+
+  @Test
+  public void testUpdateCompactionMetrics() {
+    Metrics.initialize(conf);
+    ShowCompactResponse scr = new ShowCompactResponse();
+    List<ShowCompactResponseElement> elements = new ArrayList<>();
+    elements.add(generateElement(1,"db", "tb", null, CompactionType.MAJOR, 
TxnStore.FAILED_RESPONSE));
+    // Check for overwrite
+    elements.add(generateElement(2,"db", "tb", null, CompactionType.MAJOR, 
TxnStore.INITIATED_RESPONSE));
+    elements.add(generateElement(3,"db", "tb2", null, CompactionType.MINOR, 
TxnStore.INITIATED_RESPONSE));
+    elements.add(generateElement(5,"db", "tb3", "p1", CompactionType.MINOR, 
TxnStore.ATTEMPTED_RESPONSE));
+    // Check for overwrite where the order is different
+    elements.add(generateElement(4,"db", "tb3", "p1", CompactionType.MINOR, 
TxnStore.FAILED_RESPONSE));
+
+    elements.add(generateElement(6,"db1", "tb", null, CompactionType.MINOR, 
TxnStore.FAILED_RESPONSE));
+    elements.add(generateElement(7,"db1", "tb2", null, CompactionType.MINOR, 
TxnStore.FAILED_RESPONSE));
+    elements.add(generateElement(8,"db1", "tb3", null, CompactionType.MINOR, 
TxnStore.FAILED_RESPONSE));
+
+    elements.add(generateElement(9,"db2", "tb", null, CompactionType.MINOR, 
TxnStore.SUCCEEDED_RESPONSE));
+    elements.add(generateElement(10,"db2", "tb2", null, CompactionType.MINOR, 
TxnStore.SUCCEEDED_RESPONSE));
+    elements.add(generateElement(11,"db2", "tb3", null, CompactionType.MINOR, 
TxnStore.SUCCEEDED_RESPONSE));
+    elements.add(generateElement(12,"db2", "tb4", null, CompactionType.MINOR, 
TxnStore.SUCCEEDED_RESPONSE));
+
+    elements.add(generateElement(13,"db3", "tb3", null, CompactionType.MINOR, 
TxnStore.WORKING_RESPONSE));
+    elements.add(generateElement(14,"db3", "tb4", null, CompactionType.MINOR, 
TxnStore.WORKING_RESPONSE));
+    elements.add(generateElement(15,"db3", "tb5", null, CompactionType.MINOR, 
TxnStore.WORKING_RESPONSE));
+    elements.add(generateElement(16,"db3", "tb6", null, CompactionType.MINOR, 
TxnStore.WORKING_RESPONSE));
+    elements.add(generateElement(17,"db3", "tb7", null, CompactionType.MINOR, 
TxnStore.WORKING_RESPONSE));
+
+    scr.setCompacts(elements);
+    Initiator.updateCompactionMetrics(scr);
+
+    Assert.assertEquals(1,
+        Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_STATUS_PREFIX + 
TxnStore.ATTEMPTED_RESPONSE).intValue());
+    Assert.assertEquals(2,
+        Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_STATUS_PREFIX + 
TxnStore.INITIATED_RESPONSE).intValue());
+    Assert.assertEquals(3,
+        Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_STATUS_PREFIX + 
TxnStore.FAILED_RESPONSE).intValue());
+    Assert.assertEquals(4,
+        Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_STATUS_PREFIX + 
TxnStore.SUCCEEDED_RESPONSE).intValue());
+    Assert.assertEquals(5,
+        Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_STATUS_PREFIX + 
TxnStore.WORKING_RESPONSE).intValue());
+    Assert.assertEquals(0,
+        Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_STATUS_PREFIX + 
TxnStore.CLEANING_RESPONSE).intValue());
+  }
+
+  private ShowCompactResponseElement generateElement(long id, String db, 
String table, String partition,
+      CompactionType type, String state) {
+    ShowCompactResponseElement element = new ShowCompactResponseElement(db, 
table, type, state);
+    element.setId(id);
+    element.setPartitionname(partition);
+    return element;
+  }
+
   @Override
   boolean useHive130DeltaDirName() {
     return false;
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/MetricsConstants.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/MetricsConstants.java
index 24c8c4c..7ae98fe 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/MetricsConstants.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/MetricsConstants.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.metastore.metrics;
 public class MetricsConstants {
   public static final String ACTIVE_CALLS = "active_calls_";
   public static final String API_PREFIX = "api_";
+  public static final String COMPACTION_STATUS_PREFIX = "compaction_num_";
 
   public static final String TOTAL_API_CALLS = "total_api_calls";
 
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 3e441b5..0d5b669 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -56,6 +56,9 @@ public interface TxnStore extends Configurable {
   String SUCCEEDED_RESPONSE = "succeeded";
   String ATTEMPTED_RESPONSE = "attempted";
 
+  String[] COMPACTION_STATES = new String[] {INITIATED_RESPONSE, 
WORKING_RESPONSE, CLEANING_RESPONSE, FAILED_RESPONSE,
+      SUCCEEDED_RESPONSE, ATTEMPTED_RESPONSE};
+
   int TIMED_OUT_TXN_ABORT_BATCH_SIZE = 50000;
 
   /**

Reply via email to