This is an automated email from the ASF dual-hosted git repository. klcopp pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 29c0e81 HIVE-25926: Move all logging from AcidMetricService to AcidMetricLogger (Viktor Csomor, reviewed by Karen Coppage) 29c0e81 is described below commit 29c0e81d938dbc7d56a600476c06ed9a6cba6298 Author: Viktor Csomor <csomor.vik...@gmail.com> AuthorDate: Mon Feb 7 09:37:53 2022 +0100 HIVE-25926: Move all logging from AcidMetricService to AcidMetricLogger (Viktor Csomor, reviewed by Karen Coppage) The common logic required by the `AcidMetricLogger` and the `AcidMetricService` had been extracted to a package-private component `AcidMetricData`. This change enabled to move the logging from AcidMetricService to AcidMetricLogger. Added methods: - logMultipleWorkerVersions - logFailedCompactionsPercentage - logOldestInitiatorAge Tests added. Closes #2995. --- .../ql/txn/compactor/TestCompactionMetrics.java | 1 - .../hadoop/hive/metastore/conf/MetastoreConf.java | 3 +- .../hive/metastore/metrics/AcidMetricLogger.java | 52 ++- .../hive/metastore/metrics/AcidMetricService.java | 149 ++------ .../metastore/metrics/CompactionMetricData.java | 210 +++++++++++ .../metrics/TestCompactionMetricData.java | 390 +++++++++++++++++++++ .../TestMultipleWorkerVersionDetection.java | 123 ------- 7 files changed, 678 insertions(+), 250 deletions(-) diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java index eea0c3b..07a3212 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java @@ -55,7 +55,6 @@ import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.ThrowingTxnHandler; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; -import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil; import org.junit.Assert; import org.junit.Before; import org.junit.Test; diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index aa6b18d..ae23b2f 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -608,8 +608,7 @@ public class MetastoreConf { "hive.metastore.compactor.worker.detect.multiple.versions.threshold", 24, TimeUnit.HOURS, "Defines a time-window in hours from the current time backwards\n," + "in which a warning is being raised if multiple worker version are detected.\n" + - "The setting has no effect if the metastore.metrics.enabled is disabled \n" + - "or the metastore.acidmetrics.thread.on is turned off."), + "The setting has no effect if the metastore.compactor.acid.metrics.logger.frequency is 0."), COMPACTOR_MINOR_STATS_COMPRESSION( "metastore.compactor.enable.stats.compression", "metastore.compactor.enable.stats.compression", true, diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/AcidMetricLogger.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/AcidMetricLogger.java index e8ad33b..35450a4 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/AcidMetricLogger.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/AcidMetricLogger.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hive.metastore.metrics; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.MetastoreTaskThread; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.txn.CompactionMetricsData; import org.apache.hadoop.hive.metastore.txn.MetricsInfo; @@ -56,7 +58,7 @@ public class AcidMetricLogger implements MetastoreTaskThread { public void run() { try { logDbMetrics(); - logDeltaMetrics(); + logMetrics(); } catch (MetaException e) { LOG.warn("Caught exception while trying to log acid metrics data.", e); } @@ -74,6 +76,17 @@ public class AcidMetricLogger implements MetastoreTaskThread { return conf; } + private void logMetrics() throws MetaException { + ShowCompactResponse response = txnHandler.showCompact(new ShowCompactRequest()); + CompactionMetricData metricData = CompactionMetricData.of(response.getCompacts()); + + logMultipleWorkerVersions(metricData); + logFailedCompactionsPercentage(metricData); + logOldestInitiatorAge(metricData); + + logDeltaMetrics(); + } + private void logDeltaMetrics() throws MetaException { List<CompactionMetricsData> deltas = txnHandler.getTopCompactionMetricsDataPerType(maxCacheSize); deltas.stream().filter(d -> d.getMetricType() == NUM_DELTAS).forEach(d -> LOG.warn( @@ -89,6 +102,42 @@ public class AcidMetricLogger implements MetastoreTaskThread { AcidMetricService.getDeltaCountKey(d.getDbName(), d.getTblName(), d.getPartitionName()), d.getMetricValue()))); } + private void logOldestInitiatorAge(CompactionMetricData metricData) { + int oldestInitiatorAge = (int) ((System.currentTimeMillis() - metricData.getOldestEnqueueTime()) / 1000L); + String oldestInitiatorMessage = "Found compaction entry in compaction queue with an age of {} seconds. " + + "Consider increasing the number of worker threads."; + long oldestInitiatedWarningThreshold = MetastoreConf.getTimeVar(conf, + MetastoreConf.ConfVars.COMPACTOR_OLDEST_INITIATED_COMPACTION_TIME_THRESHOLD_WARNING, + TimeUnit.SECONDS); + long oldestInitiatedErrorThreshold = MetastoreConf.getTimeVar(conf, + MetastoreConf.ConfVars.COMPACTOR_OLDEST_INITIATED_COMPACTION_TIME_THRESHOLD_ERROR, + TimeUnit.SECONDS); + if (oldestInitiatorAge >= oldestInitiatedErrorThreshold) { + LOG.error(oldestInitiatorMessage, oldestInitiatorAge); + } else if (oldestInitiatorAge >= oldestInitiatedWarningThreshold) { + LOG.warn(oldestInitiatorMessage, oldestInitiatorAge); + } + } + + private void logMultipleWorkerVersions(CompactionMetricData metricData) { + long workerVersionThresholdInMillis = MetastoreConf.getTimeVar(conf, + MetastoreConf.ConfVars.COMPACTOR_WORKER_DETECT_MULTIPLE_VERSION_THRESHOLD, TimeUnit.MILLISECONDS); + List<String> versions = metricData + .allWorkerVersionsSince(System.currentTimeMillis() - workerVersionThresholdInMillis); + + if (versions.size() > 1) { + LOG.warn("Multiple Compaction Worker versions detected: {}", versions); + } + } + + private void logFailedCompactionsPercentage(CompactionMetricData metricData) { + Double failedCompactionPercentage = metricData.getFailedCompactionPercentage(); + if (failedCompactionPercentage != null && + (failedCompactionPercentage >= + MetastoreConf.getDoubleVar(conf, MetastoreConf.ConfVars.COMPACTOR_FAILED_COMPACTION_RATIO_THRESHOLD))) { + LOG.warn("Many compactions are failing. Check root cause of failed/not initiated compactions."); + } + } private void logDbMetrics() throws MetaException { MetricsInfo metrics = txnHandler.getMetricsInfo(); if (metrics.getTxnToWriteIdCount() >= MetastoreConf.getIntVar(conf, @@ -170,4 +219,3 @@ public class AcidMetricLogger implements MetastoreTaskThread { } } } - diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/AcidMetricService.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/AcidMetricService.java index 898efb2..8ce09ea 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/AcidMetricService.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/AcidMetricService.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.metastore.metrics; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableList; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.AcidConstants; @@ -33,7 +32,6 @@ import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; -import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.txn.CompactionMetricsData; import org.apache.hadoop.hive.metastore.txn.MetricsInfo; @@ -46,15 +44,13 @@ import org.slf4j.LoggerFactory; import javax.management.MBeanServer; import javax.management.ObjectName; import java.lang.management.ManagementFactory; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import static org.apache.hadoop.hive.metastore.HiveMetaStoreClient.MANUALLY_INITIATED_COMPACTION; import static org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTION_NUM_DELTAS; import static org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTION_NUM_INITIATORS; import static org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTION_NUM_INITIATOR_VERSIONS; @@ -121,9 +117,7 @@ public class AcidMetricService implements MetastoreTaskThread { } long startedAt = System.currentTimeMillis(); try { - ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest()); - detectMultipleWorkerVersions(currentCompactions); - updateMetrics(currentCompactions); + updateMetrics(); updateDeltaMetrics(); } catch (Exception ex) { LOG.error("Caught exception in AcidMetricService loop", ex); @@ -296,38 +290,12 @@ public class AcidMetricService implements MetastoreTaskThread { } } - private void detectMultipleWorkerVersions(ShowCompactResponse currentCompactions) { - long workerVersionThresholdInMillis = MetastoreConf.getTimeVar(conf, - MetastoreConf.ConfVars.COMPACTOR_WORKER_DETECT_MULTIPLE_VERSION_THRESHOLD, TimeUnit.MILLISECONDS); - long since = System.currentTimeMillis() - workerVersionThresholdInMillis; - - List<String> versions = collectWorkerVersions(currentCompactions.getCompacts(), since); - if (versions.size() > 1) { - LOG.warn("Multiple Compaction Worker versions detected: {}", versions); - } - } - - private void updateMetrics(ShowCompactResponse currentCompactions) throws MetaException { + private void updateMetrics() throws MetaException { + ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest()); updateMetricsFromShowCompact(currentCompactions, conf); updateDBMetrics(); } - @VisibleForTesting - public static List<String> collectWorkerVersions(List<ShowCompactResponseElement> currentCompacts, long since) { - return Optional.ofNullable(currentCompacts) - .orElseGet(ImmutableList::of) - .stream() - .filter(comp -> (comp.isSetEnqueueTime() && (comp.getEnqueueTime() >= since)) - || (comp.isSetStart() && (comp.getStart() >= since)) - || (comp.isSetEndTime() && (comp.getEndTime() >= since))) - .filter(comp -> !TxnStore.DID_NOT_INITIATE_RESPONSE.equals(comp.getState())) - .map(ShowCompactResponseElement::getWorkerVersion) - .filter(Objects::nonNull) - .distinct() - .sorted() - .collect(Collectors.toList()); - } - private void updateDBMetrics() throws MetaException { MetricsInfo metrics = txnHandler.getMetricsInfo(); Metrics.getOrCreateGauge(NUM_TXN_TO_WRITEID).set(metrics.getTxnToWriteIdCount()); @@ -347,112 +315,49 @@ public class AcidMetricService implements MetastoreTaskThread { Metrics.getOrCreateGauge(OLDEST_READY_FOR_CLEANING_AGE).set(metrics.getOldestReadyForCleaningAge()); } - - @VisibleForTesting public static void updateMetricsFromShowCompact(ShowCompactResponse showCompactResponse, Configuration conf) { - Map<String, ShowCompactResponseElement> lastElements = new HashMap<>(); - long oldestEnqueueTime = Long.MAX_VALUE; - long oldestWorkingTime = Long.MAX_VALUE; - long oldestCleaningTime = Long.MAX_VALUE; - - // Get the last compaction for each db/table/partition - for(ShowCompactResponseElement element : showCompactResponse.getCompacts()) { - String key = element.getDbname() + "/" + element.getTablename() + - (element.getPartitionname() != null ? "/" + element.getPartitionname() : ""); - - // If new key, add the element, if there is an existing one, change to the element if the element.id is greater than old.id - lastElements.compute(key, (k, old) -> (old == null) ? element : (element.getId() > old.getId() ? element : old)); - - // find the oldest elements with initiated and working states - String state = element.getState(); - if (TxnStore.INITIATED_RESPONSE.equals(state) && (oldestEnqueueTime > element.getEnqueueTime())) { - oldestEnqueueTime = element.getEnqueueTime(); - } - - if (element.isSetStart()) { - if (TxnStore.WORKING_RESPONSE.equals(state) && (oldestWorkingTime > element.getStart())) { - oldestWorkingTime = element.getStart(); - } - } - - if (element.isSetCleanerStart()) { - if (TxnStore.CLEANING_RESPONSE.equals(state) && (oldestCleaningTime > element.getCleanerStart())) { - oldestCleaningTime = element.getCleanerStart(); - } - } - } + CompactionMetricData metricData = CompactionMetricData.of(showCompactResponse.getCompacts()); // Get the current count for each state - Map<String, Long> counts = lastElements.values().stream() - .collect(Collectors.groupingBy(ShowCompactResponseElement::getState, Collectors.counting())); + Map<String, Long> counts = metricData.getStateCount(); // Update metrics for (int i = 0; i < TxnStore.COMPACTION_STATES.length; ++i) { String key = COMPACTION_STATUS_PREFIX + replaceWhitespace(TxnStore.COMPACTION_STATES[i]); Long count = counts.get(TxnStore.COMPACTION_STATES[i]); if (count != null) { - Metrics.getOrCreateGauge(key).set(count.intValue()); + Metrics.getOrCreateGauge(key) + .set(count.intValue()); } else { - Metrics.getOrCreateGauge(key).set(0); + Metrics.getOrCreateGauge(key) + .set(0); } } - Long numFailedComp = counts.get(TxnStore.FAILED_RESPONSE); - Long numNotInitiatedComp = counts.get(TxnStore.DID_NOT_INITIATE_RESPONSE); - Long numSucceededComp = counts.get(TxnStore.SUCCEEDED_RESPONSE); - if (numFailedComp != null && numNotInitiatedComp != null && numSucceededComp != null && - ((numFailedComp + numNotInitiatedComp) / (numFailedComp + numNotInitiatedComp + numSucceededComp) > - MetastoreConf.getDoubleVar(conf, MetastoreConf.ConfVars.COMPACTOR_FAILED_COMPACTION_RATIO_THRESHOLD))) { - LOG.warn("Many compactions are failing. Check root cause of failed/not initiated compactions."); - } + updateOldestCompactionMetric(COMPACTION_OLDEST_ENQUEUE_AGE, metricData.getOldestEnqueueTime(), conf); + updateOldestCompactionMetric(COMPACTION_OLDEST_WORKING_AGE, metricData.getOldestWorkingTime(), conf); + updateOldestCompactionMetric(COMPACTION_OLDEST_CLEANING_AGE, metricData.getOldestCleaningTime(), conf); - updateOldestCompactionMetric(COMPACTION_OLDEST_ENQUEUE_AGE, oldestEnqueueTime, conf, - "Found compaction entry in compaction queue with an age of {} seconds. " + - "Consider increasing the number of worker threads.", - MetastoreConf.ConfVars.COMPACTOR_OLDEST_INITIATED_COMPACTION_TIME_THRESHOLD_WARNING, - MetastoreConf.ConfVars.COMPACTOR_OLDEST_INITIATED_COMPACTION_TIME_THRESHOLD_ERROR); - updateOldestCompactionMetric(COMPACTION_OLDEST_WORKING_AGE, oldestWorkingTime, conf); - updateOldestCompactionMetric(COMPACTION_OLDEST_CLEANING_AGE, oldestCleaningTime, conf); - - long initiatorsCount = lastElements.values().stream() - //manually initiated compactions don't count - .filter(e -> !MANUALLY_INITIATED_COMPACTION.equals(getThreadIdFromId(e.getInitiatorId()))) - .map(e -> getHostFromId(e.getInitiatorId())).distinct().filter(e -> !NO_VAL.equals(e)).count(); - Metrics.getOrCreateGauge(COMPACTION_NUM_INITIATORS).set((int) initiatorsCount); - long workersCount = lastElements.values().stream() - .map(e -> getHostFromId(e.getWorkerid())).distinct().filter(e -> !NO_VAL.equals(e)).count(); - Metrics.getOrCreateGauge(COMPACTION_NUM_WORKERS).set((int) workersCount); - - long initiatorVersionsCount = lastElements.values().stream() - .map(ShowCompactResponseElement::getInitiatorVersion).distinct().filter(Objects::nonNull).count(); - Metrics.getOrCreateGauge(COMPACTION_NUM_INITIATOR_VERSIONS).set((int) initiatorVersionsCount); - long workerVersionsCount = lastElements.values().stream() - .map(ShowCompactResponseElement::getWorkerVersion).distinct().filter(Objects::nonNull).count(); - Metrics.getOrCreateGauge(COMPACTION_NUM_WORKER_VERSIONS).set((int) workerVersionsCount); - } + Metrics.getOrCreateGauge(COMPACTION_NUM_INITIATORS) + .set((int) metricData.getInitiatorsCount()); + Metrics.getOrCreateGauge(COMPACTION_NUM_WORKERS) + .set((int) metricData.getWorkersCount()); - private static void updateOldestCompactionMetric(String metricName, long oldestTime, Configuration conf) { - updateOldestCompactionMetric(metricName, oldestTime, conf, null, null, null); + Metrics.getOrCreateGauge(COMPACTION_NUM_INITIATOR_VERSIONS) + .set((int) metricData.getInitiatorVersionsCount()); + Metrics.getOrCreateGauge(COMPACTION_NUM_WORKER_VERSIONS) + .set((int) metricData.getWorkerVersionsCount()); } - private static void updateOldestCompactionMetric(String metricName, long oldestTime, Configuration conf, - String logMessage, MetastoreConf.ConfVars warningThreshold, MetastoreConf.ConfVars errorThreshold) { - if (oldestTime == Long.MAX_VALUE) { + private static void updateOldestCompactionMetric(String metricName, Long oldestTime, Configuration conf) { + if (oldestTime == null) { Metrics.getOrCreateGauge(metricName) .set(0); - return; - } - - int oldestAge = (int) ((System.currentTimeMillis() - oldestTime) / 1000L); - Metrics.getOrCreateGauge(metricName) - .set(oldestAge); - if (logMessage != null) { - if (oldestAge >= MetastoreConf.getTimeVar(conf, errorThreshold, TimeUnit.SECONDS)) { - LOG.error(logMessage, oldestAge); - } else if (oldestAge >= MetastoreConf.getTimeVar(conf, warningThreshold, TimeUnit.SECONDS)) { - LOG.warn(logMessage, oldestAge); - } + } else { + int oldestAge = (int) ((System.currentTimeMillis() - oldestTime) / 1000L); + Metrics.getOrCreateGauge(metricName) + .set(oldestAge); } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/CompactionMetricData.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/CompactionMetricData.java new file mode 100644 index 0000000..81eee8d --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/CompactionMetricData.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.metrics; + +import com.google.common.collect.ImmutableList; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; +import org.apache.hadoop.hive.metastore.txn.TxnStore; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; + +import static org.apache.hadoop.hive.metastore.HiveMetaStoreClient.MANUALLY_INITIATED_COMPACTION; +import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.NO_VAL; +import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getHostFromId; +import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getThreadIdFromId; + +final class CompactionMetricData { + + private static final Long OLDEST_TIME_NO_VALUE = Long.MAX_VALUE; + + private final List<ShowCompactResponseElement> compacts; + + private long oldestEnqueueTime; + private long oldestWorkingTime; + private long oldestCleaningTime; + + private Map<String, Long> stateCount; + + private Double failedCompactionPercentage; + + private long initiatorsCount; + private long initiatorVersionsCount; + private long workersCount; + private long workerVersionsCount; + + private CompactionMetricData(List<ShowCompactResponseElement> compacts) { + this.compacts = compacts; + } + + static CompactionMetricData of(List<ShowCompactResponseElement> compacts) { + CompactionMetricData data = new CompactionMetricData(Optional.ofNullable(compacts) + .orElseGet(ImmutableList::of)); + data.init(); + return data; + } + + private void init() { + final Map<String, ShowCompactResponseElement> lastElements = new HashMap<>(); + + oldestEnqueueTime = OLDEST_TIME_NO_VALUE; + oldestWorkingTime = OLDEST_TIME_NO_VALUE; + oldestCleaningTime = OLDEST_TIME_NO_VALUE; + for (ShowCompactResponseElement element : compacts) { + final String key = element.getDbname() + "/" + element.getTablename() + + (element.getPartitionname() != null ? "/" + element.getPartitionname() : ""); + + // If new key, add the element, if there is an existing one, change to the element if the element.id is greater than old.id + lastElements.compute(key, (k, old) -> (old == null) ? element : (element.getId() > old.getId() ? element : old)); + + // find the oldest elements with initiated and working states + String state = element.getState(); + if (TxnStore.INITIATED_RESPONSE.equals(state) && (oldestEnqueueTime > element.getEnqueueTime())) { + oldestEnqueueTime = element.getEnqueueTime(); + } + + if (element.isSetStart()) { + if (TxnStore.WORKING_RESPONSE.equals(state) && (oldestWorkingTime > element.getStart())) { + oldestWorkingTime = element.getStart(); + } + } + + if (element.isSetCleanerStart()) { + if (TxnStore.CLEANING_RESPONSE.equals(state) && (oldestCleaningTime > element.getCleanerStart())) { + oldestCleaningTime = element.getCleanerStart(); + } + } + } + + stateCount = lastElements + .values() + .stream() + .collect(Collectors.groupingBy(ShowCompactResponseElement::getState, Collectors.counting())); + + failedCompactionPercentage = calculateFailedPercentage(stateCount); + + initiatorsCount = lastElements.values() + .stream() + //manually initiated compactions don't count + .filter(e -> !MANUALLY_INITIATED_COMPACTION.equals(getThreadIdFromId(e.getInitiatorId()))) + .map(e -> getHostFromId(e.getInitiatorId())) + .filter(e -> !NO_VAL.equals(e)) + .distinct() + .count(); + initiatorVersionsCount = lastElements.values() + .stream() + .map(ShowCompactResponseElement::getInitiatorVersion) + .filter(Objects::nonNull) + .distinct() + .count(); + + workersCount = lastElements.values() + .stream() + .map(e -> getHostFromId(e.getWorkerid())) + .filter(e -> !NO_VAL.equals(e)) + .distinct() + .count(); + workerVersionsCount = lastElements.values() + .stream() + .map(ShowCompactResponseElement::getWorkerVersion) + .filter(Objects::nonNull) + .distinct() + .count(); + } + + List<String> allWorkerVersionsSince(long since) { + return compacts.stream() + .filter(comp -> (comp.isSetEnqueueTime() && (comp.getEnqueueTime() >= since)) + || (comp.isSetStart() && (comp.getStart() >= since)) + || (comp.isSetEndTime() && (comp.getEndTime() >= since))) + .filter(comp -> !TxnStore.DID_NOT_INITIATE_RESPONSE.equals(comp.getState())) + .map(ShowCompactResponseElement::getWorkerVersion) + .filter(Objects::nonNull) + .distinct() + .sorted() + .collect(Collectors.toList()); + } + + Map<String, Long> getStateCount() { + return new HashMap<>(stateCount); + } + + Long getOldestEnqueueTime() { + return nullIfNotSet(oldestEnqueueTime); + } + + Long getOldestWorkingTime() { + return nullIfNotSet(oldestWorkingTime); + } + + Long getOldestCleaningTime() { + return nullIfNotSet(oldestCleaningTime); + } + + Double getFailedCompactionPercentage() { + return failedCompactionPercentage; + } + + long getInitiatorsCount() { + return initiatorsCount; + } + + long getInitiatorVersionsCount() { + return initiatorVersionsCount; + } + + long getWorkersCount() { + return workersCount; + } + + long getWorkerVersionsCount() { + return workerVersionsCount; + } + + private static Long nullIfNotSet(long value) { + if (value == OLDEST_TIME_NO_VALUE) { + return null; + } + return value; + } + + private static Double calculateFailedPercentage(Map<String, Long> stateCount) { + long failed = unwrapToPrimitive(stateCount.get(TxnStore.FAILED_RESPONSE)); + long notInitiated = unwrapToPrimitive(stateCount.get(TxnStore.DID_NOT_INITIATE_RESPONSE)); + long succeeded = unwrapToPrimitive(stateCount.get(TxnStore.SUCCEEDED_RESPONSE)); + + long denominator = failed + notInitiated + succeeded; + if (denominator > 0) { + long numerator = failed + notInitiated; + return Long.valueOf(numerator).doubleValue() / Long.valueOf(denominator).doubleValue(); + } + + return null; + } + + private static long unwrapToPrimitive(Long value) { + if (value == null) { + return 0L; + } + return value; + } +} diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/metrics/TestCompactionMetricData.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/metrics/TestCompactionMetricData.java new file mode 100644 index 0000000..aa603bd --- /dev/null +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/metrics/TestCompactionMetricData.java @@ -0,0 +1,390 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.metrics; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest; +import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.Collections; +import java.util.UUID; + +import static java.util.Collections.emptyMap; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; + +@Category(MetastoreUnitTest.class) +public class TestCompactionMetricData { + + private static final long SINCE_EPOCH = 0L; + + private static final String INITIATED = "initiated"; + private static final String WORKING = "working"; + private static final String READY_FOR_CLEANING = "ready for cleaning"; + private static final String DID_NOT_INITIATE = "did not initiate"; + private static final String SUCCEEDED = "succeeded"; + private static final String FAILED = "failed"; + + @Test + public void testStateCountsCountedCorrectly() { + assertThat( + CompactionMetricData.of(null) + .getStateCount(), + is(emptyMap())); + + assertThat( + CompactionMetricData.of( + ImmutableList.of( + aCompaction(1, "t1", null, INITIATED, CompactionType.MINOR), + aCompaction(2, "t1", null, INITIATED, CompactionType.MAJOR), + aCompaction(3, "t2", "part1", WORKING, CompactionType.MINOR), + aCompaction(5, "t2", "part1", WORKING, CompactionType.MAJOR), + aCompaction(6, "t2", "part2", WORKING, CompactionType.MAJOR), + aCompaction(7, "t3", null, READY_FOR_CLEANING, CompactionType.MAJOR) + )) + .getStateCount(), + is(ImmutableMap.of( + INITIATED, 1L, + WORKING, 2L, + READY_FOR_CLEANING, 1L + ))); + } + + @Test + public void testOldestEnqueuedValueCalculatedCorrectly() { + assertThat( + CompactionMetricData.of( + ImmutableList.of( + aCompaction(3, "t2", "part1", WORKING, CompactionType.MINOR, 90L, null, null) + )) + .getOldestEnqueueTime(), + nullValue()); + + assertThat( + CompactionMetricData.of( + ImmutableList.of( + aCompaction(1, "t1", null, INITIATED, CompactionType.MINOR, 150L, null, null), + aCompaction(2, "t1", null, INITIATED, CompactionType.MAJOR, 100L, null, null), + aCompaction(3, "t2", "part1", WORKING, CompactionType.MINOR, 90L, null, null), + aCompaction(6, "t2", "part2", DID_NOT_INITIATE, CompactionType.MAJOR, 50L, null, null), + aCompaction(7, "t3", null, READY_FOR_CLEANING, CompactionType.MAJOR, 300L, null, null) + )) + .getOldestEnqueueTime(), + is(100L)); + } + + @Test + public void testOldestWorkingValueCalculatedCorrectly() { + assertThat( + CompactionMetricData.of( + ImmutableList.of( + aCompaction(3, "t2", "part1", INITIATED, CompactionType.MINOR, null, 90L, null) + )) + .getOldestWorkingTime(), + nullValue()); + + assertThat( + CompactionMetricData.of( + ImmutableList.of( + aCompaction(1, "t1", null, INITIATED, CompactionType.MINOR, null, 150L, null), + aCompaction(2, "t1", null, INITIATED, CompactionType.MAJOR, null, 100L, null), + aCompaction(4, "t2", "part1", WORKING, CompactionType.MINOR, null, 90L, null), + aCompaction(3, "t2", "part1", WORKING, CompactionType.MINOR, null, 70L, null), + aCompaction(6, "t2", "part2", DID_NOT_INITIATE, CompactionType.MAJOR, null, 50L, null), + aCompaction(7, "t3", null, READY_FOR_CLEANING, CompactionType.MAJOR, null, 300L, null) + )) + .getOldestWorkingTime(), + is(70L)); + } + + @Test + public void testOldestCleaningValueCalculatedCorrectly() { + assertThat( + CompactionMetricData.of( + ImmutableList.of( + aCompaction(3, "t2", "part1", INITIATED, CompactionType.MINOR, null, null, 90L) + )) + .getOldestCleaningTime(), + nullValue()); + + assertThat( + CompactionMetricData.of( + ImmutableList.of( + aCompaction(1, "t1", null, INITIATED, CompactionType.MINOR, null, null, 150L), + aCompaction(2, "t1", null, READY_FOR_CLEANING, CompactionType.MAJOR, null, null, 100L), + aCompaction(4, "t2", "part1", WORKING, CompactionType.MINOR, null, null, 90L), + aCompaction(3, "t2", "part1", WORKING, CompactionType.MINOR, null, null, 70L), + aCompaction(7, "t3", null, READY_FOR_CLEANING, CompactionType.MAJOR, null, null, 300L) + )) + .getOldestCleaningTime(), + is(100L)); + } + + @Test + public void testFailedPercentageCalculatedCorrectly() { + assertThat( + CompactionMetricData.of( + ImmutableList.of()) + .getFailedCompactionPercentage(), + nullValue()); + + assertThat( + CompactionMetricData.of( + ImmutableList.of(aCompaction(1, "t1", "p1", SUCCEEDED, CompactionType.MINOR))) + .getFailedCompactionPercentage(), + is(0.0D)); + + assertThat( + CompactionMetricData.of( + ImmutableList.of( + aCompaction(1, "t1", "p1", FAILED, CompactionType.MINOR), + aCompaction(2, "t2", "p1", SUCCEEDED, CompactionType.MINOR))) + .getFailedCompactionPercentage(), + is(0.5D)); + + assertThat( + CompactionMetricData.of( + ImmutableList.of( + aCompaction(1, "t1", "p1", FAILED, CompactionType.MINOR), + aCompaction(2, "t2", "p1", DID_NOT_INITIATE, CompactionType.MINOR), + aCompaction(3, "t3", "p1", SUCCEEDED, CompactionType.MINOR), + aCompaction(4, "t4", "p1", SUCCEEDED, CompactionType.MINOR))) + .getFailedCompactionPercentage(), + is(0.5D)); + + assertThat( + CompactionMetricData.of( + ImmutableList.of( + aCompaction(1, "t1", "p1", FAILED, CompactionType.MINOR), + aCompaction(2, "t2", "p1", DID_NOT_INITIATE, CompactionType.MINOR))) + .getFailedCompactionPercentage(), + is(1.0D)); + + assertThat( + CompactionMetricData.of( + ImmutableList.of( + aCompaction(1, "t1", "p1", FAILED, CompactionType.MINOR))) + .getFailedCompactionPercentage(), + is(1.0D)); + + assertThat( + CompactionMetricData.of( + ImmutableList.of( + aCompaction(1, "t1", "p1", DID_NOT_INITIATE, CompactionType.MINOR))) + .getFailedCompactionPercentage(), + is(1.0D)); + } + + @Test + public void testInitiatorCountCalculatedCorrectly() { + assertThat(CompactionMetricData.of(Collections.emptyList()) + .getInitiatorsCount(), + is(0L)); + assertThat(CompactionMetricData.of( + ImmutableList.of( + aCompaction(1, null, null, null, (String) null), + aCompaction(2, "host1-initiator", null, null, (String) null), + aCompaction(3, "host2-initiator-manual", null, null, (String) null), + aCompaction(4, "host3-initiator", null, null, (String) null))) + .getInitiatorsCount(), + is(2L)); + } + + @Test + public void testInitiatorVersionsCalculatedCorrectly() { + assertThat(CompactionMetricData.of(Collections.emptyList()) + .getInitiatorVersionsCount(), + is(0L)); + assertThat(CompactionMetricData.of( + ImmutableList.of( + aCompaction(1, null, "1.0", null, (String) null), + aCompaction(2, null, "3.0", null, (String) null), + aCompaction(3, null, "4.0", null, (String) null), + aCompaction(4, null, null, null, (String) null), + aCompaction(5, null, "4.0", null, (String) null))) + .getInitiatorVersionsCount(), + is(3L)); + } + + @Test + public void testWorkerCountCalculatedCorrectly() { + assertThat(CompactionMetricData.of(Collections.emptyList()) + .getWorkersCount(), + is(0L)); + assertThat(CompactionMetricData.of( + ImmutableList.of( + aCompaction(1, null, null, null, "4.0"), + aCompaction(2, null, null, "host1-worker", "4.0"), + aCompaction(3, null, null, "host2-worker", "4.0"))) + .getWorkersCount(), + is(2L)); + } + + @Test + public void testWorkerVersionsCalculatedCorrectly() { + assertThat(CompactionMetricData.of(Collections.emptyList()) + .getWorkerVersionsCount(), + is(0L)); + assertThat(CompactionMetricData.of( + ImmutableList.of( + aCompaction(1, null, null, null, "1.0"), + aCompaction(2, null, null, null, "3.0"), + aCompaction(3, null, null, null, "4.0"), + aCompaction(4, null, null, null, (String) null), + aCompaction(5, null, null, null, "4.0"))) + .getWorkerVersionsCount(), + is(3L)); + } + + @Test + public void testCollectWorkerVersionsEmptyLists() { + assertThat(CompactionMetricData.of(Collections.emptyList()).allWorkerVersionsSince(SINCE_EPOCH), + + is(Collections.emptyList())); + } + + @Test + public void testCollectWorkerVersionsDidNotInitiateGettingFilteredOut() { + assertThat(CompactionMetricData.of( + ImmutableList.of( + aCompaction("DoNotShowUp", DID_NOT_INITIATE, 1L, 1L, 1L), + aCompaction("1.0", INITIATED, 1L, 1L, 1L))) + .allWorkerVersionsSince(SINCE_EPOCH), + + is(Collections.singletonList("1.0")) + ); + } + + @Test + public void testCollectWorkerVersionsNullVersionGettingFilteredOut() { + assertThat(CompactionMetricData.of( + ImmutableList.of( + aCompaction(null, INITIATED, 1L, 1L, 1L))) + .allWorkerVersionsSince(SINCE_EPOCH), + + is(Collections.emptyList()) + ); + } + + @Test + public void testCollectWorkerVersionsTimeThreshold() { + assertThat(CompactionMetricData.of( + ImmutableList.of( + aCompaction("0.0-not-shown", INITIATED, 99L, null, null), + aCompaction("0.1-not-shown", INITIATED, 99L, 99L, null), + aCompaction("0.2-not-shown", INITIATED, 99L, 99L, 99L), + + aCompaction("1.0", INITIATED, 100L, null, null), + aCompaction("1.1", WORKING, 99L, 100L, null), + aCompaction("1.2", SUCCEEDED, 99L, 99L, 100L) + )) + .allWorkerVersionsSince(100), + + is(ImmutableList.of("1.0", "1.1", "1.2")) + ); + } + + @Test + public void testCollectWorkerVersionsSortedAndAvoidDuplicates() { + assertThat(CompactionMetricData.of( + ImmutableList.of( + aCompaction("2.0", INITIATED, 1L, null, null), + aCompaction("2.1", INITIATED, 1L, null, null), + aCompaction("2.10", INITIATED, 1L, null, null), + aCompaction("2.2", INITIATED, 1L, null, null), + aCompaction("3.0", WORKING, 1L, null, null), + aCompaction("1.0", INITIATED, 1L, null, null), + aCompaction("1.0", WORKING, 1L, null, null) + )) + .allWorkerVersionsSince(SINCE_EPOCH), + + is(ImmutableList.of("1.0", "2.0", "2.1", "2.10", "2.2", "3.0")) + ); + } + + private static ShowCompactResponseElement aCompaction(long id, String table, String partition, String state, + CompactionType type) { + ShowCompactResponseElement e = new ShowCompactResponseElement("db_name", table, type, state); + e.setId(id); + e.setPartitionname(partition); + return e; + } + + private static ShowCompactResponseElement aCompaction(long id, + String initiatorId, String initiatorVersion, + String workerId, String workerVersion) { + ShowCompactResponseElement e = new ShowCompactResponseElement("db_name", UUID.randomUUID().toString(), + CompactionType.MAJOR, INITIATED); + e.setId(id); + + e.setInitiatorId(initiatorId); + e.setInitiatorVersion(initiatorVersion); + + e.setWorkerid(workerId); + e.setWorkerVersion(workerVersion); + + return e; + } + + private static ShowCompactResponseElement aCompaction(long id, String table, String partition, String state, + CompactionType type, Long enqueuedTime, Long startTime, Long cleanerStart) { + ShowCompactResponseElement e = new ShowCompactResponseElement("db_name", table, type, state); + e.setId(id); + e.setPartitionname(partition); + + if (enqueuedTime != null) { + e.setEnqueueTime(enqueuedTime); + } + + if (startTime != null) { + e.setStart(startTime); + } + + if (cleanerStart != null) { + e.setCleanerStart(cleanerStart); + } + + return e; + } + + private static ShowCompactResponseElement aCompaction(String workerVersion, String state, + Long enqueuedTime, Long startTime, Long endTime) { + + ShowCompactResponseElement e = new ShowCompactResponseElement("db_name", "table_name", CompactionType.MINOR, state); + e.setWorkerVersion(workerVersion); + + if (enqueuedTime != null) { + e.setEnqueueTime(enqueuedTime); + } + + if (startTime != null) { + e.setStart(startTime); + } + + if (endTime != null) { + e.setEndTime(endTime); + } + + return e; + } +} diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/metrics/TestMultipleWorkerVersionDetection.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/metrics/TestMultipleWorkerVersionDetection.java deleted file mode 100644 index 1476f4d..0000000 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/metrics/TestMultipleWorkerVersionDetection.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.metastore.metrics; - -import com.google.common.collect.ImmutableList; -import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest; -import org.apache.hadoop.hive.metastore.api.CompactionType; -import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; -import org.hamcrest.CoreMatchers; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import java.util.Collections; - -import static org.hamcrest.MatcherAssert.assertThat; - -@Category(MetastoreUnitTest.class) -public class TestMultipleWorkerVersionDetection { - - private final long SINCE_EPOCH = 0L; - - @Test - public void testCollectWorkerVersionsEmptyLists() { - assertThat(AcidMetricService.collectWorkerVersions(null, SINCE_EPOCH), CoreMatchers.is(Collections.emptyList())); - assertThat(AcidMetricService.collectWorkerVersions(Collections.emptyList(), SINCE_EPOCH), - CoreMatchers.is(Collections.emptyList())); - } - - @Test - public void testCollectWorkerVersionsDidNotInitiateGettingFilteredOut() { - assertThat(AcidMetricService.collectWorkerVersions( - ImmutableList.of( - showCompactResponse("DoNotShowUp", "did not initiate", 1L, 1L, 1L), - showCompactResponse("1.0", "initiated", 1L, 1L, 1L)), - SINCE_EPOCH), - - CoreMatchers.is(Collections.singletonList("1.0")) - ); - } - - @Test - public void testCollectWorkerVersionsNullVersionGettingFilteredOut() { - assertThat(AcidMetricService.collectWorkerVersions( - ImmutableList.of( - showCompactResponse(null, "initiated", 1L, 1L, 1L)), - SINCE_EPOCH), - - CoreMatchers.is(Collections.emptyList()) - ); - } - - @Test - public void testCollectWorkerVersionsTimeThreshold() { - assertThat(AcidMetricService.collectWorkerVersions( - ImmutableList.of( - showCompactResponse("0.0-not-shown", "initiated", 99L, null, null), - showCompactResponse("0.1-not-shown", "initiated", 99L, 99L, null), - showCompactResponse("0.2-not-shown", "initiated", 99L, 99L, 99L), - - showCompactResponse("1.0", "initiated", 100L, null, null), - showCompactResponse("1.1", "working", 99L, 100L, null), - showCompactResponse("1.2", "succeeded", 99L, 99L, 100L) - ), - 100), - - CoreMatchers.is(ImmutableList.of("1.0", "1.1", "1.2")) - ); - } - - @Test - public void testCollectWorkerVersionsSortedAndAvoidDuplicates() { - assertThat(AcidMetricService.collectWorkerVersions( - ImmutableList.of( - showCompactResponse("2.0", "initiated", 1L, null, null), - showCompactResponse("2.1", "initiated", 1L, null, null), - showCompactResponse("2.10", "initiated", 1L, null, null), - showCompactResponse("2.2", "initiated", 1L, null, null), - showCompactResponse("3.0", "working", 1L, null, null), - showCompactResponse("1.0", "initiated", 1L, null, null), - showCompactResponse("1.0", "working", 1L, null, null) - ), - SINCE_EPOCH), - - CoreMatchers.is(ImmutableList.of("1.0", "2.0", "2.1", "2.10", "2.2", "3.0")) - ); - } - - private static ShowCompactResponseElement showCompactResponse(String workerVersion, String state, - Long enqueuedTime, Long startTime, Long endTime) { - - ShowCompactResponseElement e = new ShowCompactResponseElement("db_name", "table_name", CompactionType.MINOR, state); - e.setWorkerVersion(workerVersion); - - if (enqueuedTime != null) { - e.setEnqueueTime(enqueuedTime); - } - - if (startTime != null) { - e.setStart(startTime); - } - - if (endTime != null) { - e.setEndTime(endTime); - } - - return e; - } -}