This is an automated email from the ASF dual-hosted git repository. klcopp pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 256fcbe HIVE-25513: Delta metrics collection may cause NPE (Karen Coppage, reviewed by Laszlo Pinter) 256fcbe is described below commit 256fcbe158938bdaf26b8adf01dcf591ca13da28 Author: Karen Coppage <klc...@apache.org> AuthorDate: Mon Sep 13 09:52:01 2021 +0200 HIVE-25513: Delta metrics collection may cause NPE (Karen Coppage, reviewed by Laszlo Pinter) Closes #2633. --- .../metrics/DeltaFilesMetricReporter.java | 86 ++++++++++++---------- .../ql/txn/compactor/TestDeltaFilesMetrics.java | 28 +++++++ 2 files changed, 74 insertions(+), 40 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java index 61b6a55..311d5ba 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java @@ -62,7 +62,6 @@ import java.util.Date; import java.util.EnumMap; import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.PriorityQueue; import java.util.Queue; @@ -225,40 +224,48 @@ public class DeltaFilesMetricReporter { EnumMap<DeltaFilesMetricType, Queue<Pair<String, Integer>>> deltaFilesStats, Configuration conf) throws IOException { - long baseSize = getBaseSize(dir); - int numObsoleteDeltas = getNumObsoleteDeltas(dir, checkThresholdInSec); + try { + long baseSize = getBaseSize(dir); + int numObsoleteDeltas = getNumObsoleteDeltas(dir, checkThresholdInSec); - int numDeltas = 0; - int numSmallDeltas = 0; + int numDeltas = 0; + int numSmallDeltas = 0; - long now = new Date().getTime(); + long now = new Date().getTime(); - for (AcidUtils.ParsedDelta delta : dir.getCurrentDirectories()) { - if (now - getModificationTime(delta, dir.getFs()) >= checkThresholdInSec * 1000) { - numDeltas++; + for (AcidUtils.ParsedDelta delta : dir.getCurrentDirectories()) { + if (now - getModificationTime(delta, dir.getFs()) >= checkThresholdInSec * 1000) { + numDeltas++; - long deltaSize = getDirSize(delta, dir.getFs()); - if (baseSize != 0 && deltaSize / (float) baseSize < deltaPctThreshold) { - numSmallDeltas++; + long deltaSize = getDirSize(delta, dir.getFs()); + if (baseSize != 0 && deltaSize / (float) baseSize < deltaPctThreshold) { + numSmallDeltas++; + } } } - } - logDeltaDirMetrics(dir, conf, numObsoleteDeltas, numDeltas, numSmallDeltas); + logDeltaDirMetrics(dir, conf, numObsoleteDeltas, numDeltas, numSmallDeltas); - String serializedMetadata = conf.get(JOB_CONF_DELTA_FILES_METRICS_METADATA); - HashMap<Path, DeltaFilesMetadata> pathToMetadata = new HashMap<>(); - pathToMetadata = SerializationUtilities.deserializeObject(serializedMetadata, pathToMetadata.getClass()); - if (pathToMetadata == null) { - LOG.warn("Delta metrics can't be updated since the metadata is null."); - return; + String serializedMetadata = conf.get(JOB_CONF_DELTA_FILES_METRICS_METADATA); + if (serializedMetadata == null) { + LOG.warn("delta.files.metrics.metadata is missing from config. Delta metrics can't be updated."); + return; + } + HashMap<Path, DeltaFilesMetadata> pathToMetadata = new HashMap<>(); + pathToMetadata = SerializationUtilities.deserializeObject(serializedMetadata, pathToMetadata.getClass()); + if (pathToMetadata == null) { + LOG.warn("Delta metrics can't be updated since the metadata is null."); + return; + } + DeltaFilesMetadata metadata = pathToMetadata.get(dir.getPath()); + filterAndAddToDeltaFilesStats(NUM_DELTAS, numDeltas, deltasThreshold, deltaFilesStats, metadata, maxCacheSize); + filterAndAddToDeltaFilesStats(NUM_OBSOLETE_DELTAS, numObsoleteDeltas, obsoleteDeltasThreshold, deltaFilesStats, + metadata, maxCacheSize); + filterAndAddToDeltaFilesStats(NUM_SMALL_DELTAS, numSmallDeltas, deltasThreshold, deltaFilesStats, metadata, + maxCacheSize); + } catch (Throwable t) { + LOG.warn("Unknown throwable caught while updating delta metrics. Metrics will not be updated.", t); } - DeltaFilesMetadata metadata = pathToMetadata.get(dir.getPath()); - filterAndAddToDeltaFilesStats(NUM_DELTAS, numDeltas, deltasThreshold, deltaFilesStats, metadata, maxCacheSize); - filterAndAddToDeltaFilesStats(NUM_OBSOLETE_DELTAS, numObsoleteDeltas, obsoleteDeltasThreshold, deltaFilesStats, - metadata, maxCacheSize); - filterAndAddToDeltaFilesStats(NUM_SMALL_DELTAS, numSmallDeltas, deltasThreshold, deltaFilesStats, - metadata, maxCacheSize); } /** @@ -342,12 +349,6 @@ public class DeltaFilesMetricReporter { return numObsoleteDeltas; } - private static String getRelPath(AcidUtils.Directory directory) { - return directory.getPath().getName().contains("=") ? - directory.getPath().getParent().getName() + Path.SEPARATOR + directory.getPath().getName() : - directory.getPath().getName(); - } - public static void createCountersForAcidMetrics(TezCounters tezCounters, JobConf jobConf) { if (HiveConf.getBoolVar(jobConf, HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED) && MetastoreConf.getBoolVar(jobConf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON)) { @@ -364,19 +365,24 @@ public class DeltaFilesMetricReporter { public static void addAcidMetricsToConfObj(EnumMap<DeltaFilesMetricType, Queue<Pair<String, Integer>>> deltaFilesStats, Configuration conf) { - deltaFilesStats.forEach((type, value) -> - conf.set(type.name(), Joiner.on(ENTRY_SEPARATOR).withKeyValueSeparator(KEY_VALUE_SEPARATOR).join(value))); + try { + deltaFilesStats.forEach((type, value) -> conf + .set(type.name(), Joiner.on(ENTRY_SEPARATOR).withKeyValueSeparator(KEY_VALUE_SEPARATOR).join(value))); + + } catch (Exception e) { + LOG.warn("Couldn't add Delta metrics to conf object", e); + } } public static void backPropagateAcidMetrics(JobConf jobConf, Configuration conf) { if (HiveConf.getBoolVar(jobConf, HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED) && MetastoreConf.getBoolVar(jobConf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON)) { - - Arrays.stream(DeltaFilesMetricType.values()) - .filter(type -> conf.get(type.name()) != null) - .forEach(type -> - jobConf.set(type.name(), conf.get(type.name())) - ); + try { + Arrays.stream(DeltaFilesMetricType.values()).filter(type -> conf.get(type.name()) != null) + .forEach(type -> jobConf.set(type.name(), conf.get(type.name()))); + } catch (Exception e) { + LOG.warn("Couldn't back propagate Delta metrics to jobConf object", e); + } } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestDeltaFilesMetrics.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestDeltaFilesMetrics.java index ee7fbf5..e321f33 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestDeltaFilesMetrics.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestDeltaFilesMetrics.java @@ -19,10 +19,14 @@ package org.apache.hadoop.hive.ql.txn.compactor; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.metrics.MetricsConstants; import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.io.AcidDirectory; import org.apache.hadoop.hive.ql.txn.compactor.metrics.DeltaFilesMetricReporter; import org.apache.tez.common.counters.TezCounters; import org.jetbrains.annotations.NotNull; @@ -35,8 +39,10 @@ import javax.management.MBeanInfo; import javax.management.MBeanServer; import javax.management.ObjectName; import java.lang.management.ManagementFactory; +import java.util.EnumMap; import java.util.HashMap; import java.util.Map; +import java.util.Queue; import java.util.concurrent.TimeUnit; import static org.apache.hadoop.hive.ql.txn.compactor.metrics.DeltaFilesMetricReporter.DeltaFilesMetricType.NUM_DELTAS; @@ -174,6 +180,28 @@ public class TestDeltaFilesMetrics extends CompactorTest { }}, gaugeToMap(MetricsConstants.COMPACTION_NUM_DELTAS)); } + @Test + public void testMergeDeltaFilesStatsNullData() throws Exception { + setUpHiveConf(); + MetricsFactory.close(); + MetricsFactory.init(conf); + DeltaFilesMetricReporter.init(conf); + + AcidDirectory dir = new AcidDirectory(new Path("/"), FileSystem.get(conf), null); + long checkThresholdInSec = HiveConf.getTimeVar(conf, + HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_DELTA_CHECK_THRESHOLD, TimeUnit.SECONDS); + float deltaPctThreshold = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_DELTA_PCT_THRESHOLD); + int deltasThreshold = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_DELTA_NUM_THRESHOLD); + int obsoleteDeltasThreshold = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_OBSOLETE_DELTA_NUM_THRESHOLD); + int maxCacheSize = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_MAX_CACHE_SIZE); + EnumMap<DeltaFilesMetricReporter.DeltaFilesMetricType, Queue<Pair<String, Integer>>> deltaFilesStats = + new EnumMap<>(DeltaFilesMetricReporter.DeltaFilesMetricType.class); + + //conf.get(JOB_CONF_DELTA_FILES_METRICS_METADATA) will not have a value assigned; this test checks for an NPE + DeltaFilesMetricReporter.mergeDeltaFilesStats(dir,checkThresholdInSec, deltaPctThreshold, deltasThreshold, + obsoleteDeltasThreshold, maxCacheSize, deltaFilesStats, conf); + } + static void verifyMetricsMatch(Map<String, String> expected, Map<String, String> actual) { Assert.assertTrue("Actual metrics " + actual + " don't match expected: " + expected, equivalent(expected, actual));