This is an automated email from the ASF dual-hosted git repository. klcopp pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 6a53540 HIVE-25737: Compaction Observability: Initiator/Worker/Cleaner cycle measurement improvements (Viktor Csomor, reviewed by Laszlo Pinter and Karen Coppage) 6a53540 is described below commit 6a535403a49b972bb0a488d65bab06c57e3c8c66 Author: Viktor Csomor <csomor.vik...@gmail.com> AuthorDate: Thu Dec 9 09:32:33 2021 +0100 HIVE-25737: Compaction Observability: Initiator/Worker/Cleaner cycle measurement improvements (Viktor Csomor, reviewed by Laszlo Pinter and Karen Coppage) Closes #2827. A daemon thread has been implemented for the Initiator that measures the elapsed time since its start. The PerformanceLogger approach is also kept but the metrics intended to use the gauge style The Age of Oldest Working Compaction metric has been implemented in the AcidMetricService The Age of Oldest active Cleaner metric has been implemented - CQ_CLEANER_START field added to the COMPACTION_QUEUE table - COMPACTION_OLDEST_CLEANING_AGE metric has been added - markCleaning method added to the CompactionTxnHandler - ShowCompactResponseElement extended with the cleanerStart field --- .../java/org/apache/hadoop/hive/conf/HiveConf.java | 16 ++ .../upgrade/hive/hive-schema-4.0.0.hive.sql | 21 ++- .../upgrade/hive/upgrade-3.1.0-to-4.0.0.hive.sql | 21 ++- .../hadoop/hive/ql/txn/compactor/Cleaner.java | 31 ++++ .../hadoop/hive/ql/txn/compactor/Initiator.java | 55 +++++- .../ql/txn/compactor/MetaStoreCompactorThread.java | 39 +++++ .../ql/txn/compactor/TestCompactionMetrics.java | 184 +++++++++++++++++---- .../test/results/clientpositive/llap/sysdb.q.out | 11 +- .../gen/thrift/gen-cpp/hive_metastore_types.cpp | 22 +++ .../src/gen/thrift/gen-cpp/hive_metastore_types.h | 12 +- .../metastore/api/ShowCompactResponseElement.java | 106 +++++++++++- .../metastore/ShowCompactResponseElement.php | 24 +++ .../src/gen/thrift/gen-py/hive_metastore/ttypes.py | 14 +- .../src/gen/thrift/gen-rb/hive_metastore_types.rb | 4 +- .../hadoop/hive/metastore/conf/MetastoreConf.java | 12 ++ .../src/main/thrift/hive_metastore.thrift | 3 +- .../hive/metastore/metrics/AcidMetricService.java | 71 +++++--- .../hive/metastore/metrics/MetricsConstants.java | 4 + .../hive/metastore/txn/CompactionTxnHandler.java | 133 +++++++++++++-- .../hadoop/hive/metastore/txn/TxnHandler.java | 39 +++-- .../apache/hadoop/hive/metastore/txn/TxnStore.java | 66 +++++++- .../src/main/sql/derby/hive-schema-4.0.0.derby.sql | 3 +- .../sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql | 3 + .../src/main/sql/mssql/hive-schema-4.0.0.mssql.sql | 1 + .../sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql | 3 + .../src/main/sql/mysql/hive-schema-4.0.0.mysql.sql | 3 +- .../sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql | 3 + .../main/sql/oracle/hive-schema-4.0.0.oracle.sql | 3 +- .../sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql | 4 +- .../sql/postgres/hive-schema-4.0.0.postgres.sql | 3 +- .../postgres/upgrade-3.2.0-to-4.0.0.postgres.sql | 4 +- .../upgrade-3.1.3000-to-4.0.0.postgres.sql | 3 + 32 files changed, 800 insertions(+), 121 deletions(-) diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 988cec8..d9eec2e 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3190,6 +3190,22 @@ public class HiveConf extends Configuration { "has had a transaction done on it since the last major compaction. So decreasing this\n" + "value will increase the load on the NameNode."), + HIVE_COMPACTOR_INITIATOR_DURATION_UPDATE_INTERVAL("hive.compactor.initiator.duration.update.interval", "60s", + new TimeValidator(TimeUnit.SECONDS), + "Time in seconds that drives the update interval of compaction_initiator_duration metric.\n" + + "Smaller value results in a fine grained metric update.\n" + + "This updater can be turned off if its value less than or equals to zero.\n"+ + "In this case the above metric will be update only after the initiator completed one cycle.\n" + + "The hive.compactor.initiator.on must be turned on (true) in-order to enable the Initiator,\n" + + "otherwise this setting has no effect."), + + HIVE_COMPACTOR_CLEANER_DURATION_UPDATE_INTERVAL("hive.compactor.cleaner.duration.update.interval", "60s", + new TimeValidator(TimeUnit.SECONDS), + "Time in seconds that drives the update interval of compaction_cleaner_duration metric.\n" + + "Smaller value results in a fine grained metric update.\n" + + "This updater can be turned off if its value less than or equals to zero.\n"+ + "In this case the above metric will be update only after the cleaner completed one cycle."), + HIVE_COMPACTOR_REQUEST_QUEUE("hive.compactor.request.queue", 1, "Enables parallelization of the checkForCompaction operation, that includes many file metadata checks\n" + "and may be expensive"), diff --git a/metastore/scripts/upgrade/hive/hive-schema-4.0.0.hive.sql b/metastore/scripts/upgrade/hive/hive-schema-4.0.0.hive.sql index 9de3488..d0654a5 100644 --- a/metastore/scripts/upgrade/hive/hive-schema-4.0.0.hive.sql +++ b/metastore/scripts/upgrade/hive/hive-schema-4.0.0.hive.sql @@ -1092,7 +1092,8 @@ CREATE EXTERNAL TABLE IF NOT EXISTS `COMPACTION_QUEUE` ( `CQ_ERROR_MESSAGE` string, `CQ_INITIATOR_ID` string, `CQ_INITIATOR_VERSION` string, - `CQ_WORKER_VERSION` string + `CQ_WORKER_VERSION` string, + `CQ_CLEANER_START` bigint ) STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler' TBLPROPERTIES ( @@ -1115,7 +1116,8 @@ TBLPROPERTIES ( \"COMPACTION_QUEUE\".\"CQ_ERROR_MESSAGE\", \"COMPACTION_QUEUE\".\"CQ_INITIATOR_ID\", \"COMPACTION_QUEUE\".\"CQ_INITIATOR_VERSION\", - \"COMPACTION_QUEUE\".\"CQ_WORKER_VERSION\" + \"COMPACTION_QUEUE\".\"CQ_WORKER_VERSION\", + \"COMPACTION_QUEUE\".\"CQ_CLEANER_START\" FROM \"COMPACTION_QUEUE\" " ); @@ -1187,7 +1189,8 @@ CREATE OR REPLACE VIEW `COMPACTIONS` `C_HIGHEST_WRITE_ID`, `C_INITIATOR_HOST`, `C_INITIATOR_ID`, - `C_INITIATOR_VERSION` + `C_INITIATOR_VERSION`, + `C_CLEANER_START` ) AS SELECT CC_ID, @@ -1209,7 +1212,8 @@ SELECT CC_HIGHEST_WRITE_ID, CASE WHEN CC_INITIATOR_ID IS NULL THEN cast (null as string) ELSE split(CC_INITIATOR_ID,"-")[0] END, CASE WHEN CC_INITIATOR_ID IS NULL THEN cast (null as string) ELSE split(CC_INITIATOR_ID,"-")[size(split(CC_INITIATOR_ID,"-"))-1] END, - CC_INITIATOR_VERSION + CC_INITIATOR_VERSION, + NULL FROM COMPLETED_COMPACTIONS UNION ALL SELECT @@ -1231,7 +1235,8 @@ SELECT CQ_HIGHEST_WRITE_ID, CASE WHEN CQ_INITIATOR_ID IS NULL THEN NULL ELSE split(CQ_INITIATOR_ID,"-")[0] END, CASE WHEN CQ_INITIATOR_ID IS NULL THEN NULL ELSE split(CQ_INITIATOR_ID,"-")[size(split(CQ_INITIATOR_ID,"-"))-1] END, - CQ_INITIATOR_VERSION + CQ_INITIATOR_VERSION, + CQ_CLEANER_START FROM COMPACTION_QUEUE; CREATE EXTERNAL TABLE IF NOT EXISTS `SCHEDULED_QUERIES` ( @@ -1874,7 +1879,8 @@ CREATE OR REPLACE VIEW `COMPACTIONS` `C_HIGHEST_WRITE_ID`, `C_INITIATOR_HOST`, `C_INITIATOR_ID`, - `C_INITIATOR_VERSION` + `C_INITIATOR_VERSION`, + `C_CLEANER_START` ) AS SELECT DISTINCT C_ID, @@ -1895,7 +1901,8 @@ SELECT DISTINCT C_HIGHEST_WRITE_ID, C_INITIATOR_HOST, C_INITIATOR_ID, - C_INITIATOR_VERSION + C_INITIATOR_VERSION, + C_CLEANER_START FROM `sys`.`COMPACTIONS` C JOIN `sys`.`TBLS` T ON (C.`C_TABLE` = T.`TBL_NAME`) JOIN `sys`.`DBS` D ON (C.`C_DATABASE` = D.`NAME`) diff --git a/metastore/scripts/upgrade/hive/upgrade-3.1.0-to-4.0.0.hive.sql b/metastore/scripts/upgrade/hive/upgrade-3.1.0-to-4.0.0.hive.sql index 6e3a9e1..fb76575 100644 --- a/metastore/scripts/upgrade/hive/upgrade-3.1.0-to-4.0.0.hive.sql +++ b/metastore/scripts/upgrade/hive/upgrade-3.1.0-to-4.0.0.hive.sql @@ -210,7 +210,8 @@ CREATE EXTERNAL TABLE IF NOT EXISTS `COMPACTION_QUEUE` ( `CQ_ERROR_MESSAGE` string, `CQ_INITIATOR_ID` string, `CQ_INITIATOR_VERSION` string, - `CQ_WORKER_VERSION` string + `CQ_WORKER_VERSION` string, + `CQ_CLEANER_START` bigint ) STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler' TBLPROPERTIES ( @@ -233,7 +234,8 @@ TBLPROPERTIES ( \"COMPACTION_QUEUE\".\"CQ_ERROR_MESSAGE\", \"COMPACTION_QUEUE\".\"CQ_INITIATOR_ID\", \"COMPACTION_QUEUE\".\"CQ_INITIATOR_VERSION\", - \"COMPACTION_QUEUE\".\"CQ_WORKER_VERSION\" + \"COMPACTION_QUEUE\".\"CQ_WORKER_VERSION\", + \"COMPACTION_QUEUE\".\"CQ_CLEANER_START\" FROM \"COMPACTION_QUEUE\" " ); @@ -305,7 +307,8 @@ CREATE OR REPLACE VIEW `COMPACTIONS` `C_HIGHEST_WRITE_ID`, `C_INITIATOR_HOST`, `C_INITIATOR_ID`, - `C_INITIATOR_VERSION` + `C_INITIATOR_VERSION`, + `C_CLEANER_START` ) AS SELECT CC_ID, @@ -327,7 +330,8 @@ SELECT CC_HIGHEST_WRITE_ID, CASE WHEN CC_INITIATOR_ID IS NULL THEN cast (null as string) ELSE split(CC_INITIATOR_ID,"-")[0] END, CASE WHEN CC_INITIATOR_ID IS NULL THEN cast (null as string) ELSE split(CC_INITIATOR_ID,"-")[size(split(CC_INITIATOR_ID,"-"))-1] END, - CC_INITIATOR_VERSION + CC_INITIATOR_VERSION, + NULL FROM COMPLETED_COMPACTIONS UNION ALL SELECT @@ -349,7 +353,8 @@ SELECT CQ_HIGHEST_WRITE_ID, CASE WHEN CQ_INITIATOR_ID IS NULL THEN NULL ELSE split(CQ_INITIATOR_ID,"-")[0] END, CASE WHEN CQ_INITIATOR_ID IS NULL THEN NULL ELSE split(CQ_INITIATOR_ID,"-")[size(split(CQ_INITIATOR_ID,"-"))-1] END, - CQ_INITIATOR_VERSION + CQ_INITIATOR_VERSION, + CQ_CLEANER_START FROM COMPACTION_QUEUE; -- HIVE-22553 @@ -841,7 +846,8 @@ CREATE OR REPLACE VIEW `COMPACTIONS` `C_HIGHEST_WRITE_ID`, `C_INITIATOR_HOST`, `C_INITIATOR_ID`, - `C_INITIATOR_VERSION` + `C_INITIATOR_VERSION`, + `C_CLEANER_START` ) AS SELECT DISTINCT C_ID, @@ -862,7 +868,8 @@ SELECT DISTINCT C_HIGHEST_WRITE_ID, C_INITIATOR_HOST, C_INITIATOR_ID, - C_INITIATOR_VERSION + C_INITIATOR_VERSION, + C_CLEANER_START FROM `sys`.`COMPACTIONS` C JOIN `sys`.`TBLS` T ON (C.`C_TABLE` = T.`TBL_NAME`) JOIN `sys`.`DBS` D ON (C.`C_DATABASE` = D.`NAME`) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index 8149494..38f4fec 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -113,6 +113,14 @@ public class Cleaner extends MetaStoreCompactorThread { try { handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name()); startedAt = System.currentTimeMillis(); + + if (metricsEnabled) { + stopCycleUpdater(); + startCycleUpdater(HiveConf.getTimeVar(conf, + HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_DURATION_UPDATE_INTERVAL, TimeUnit.MILLISECONDS), + new CleanerCycleUpdater(MetricsConstants.COMPACTION_CLEANER_CYCLE_DURATION, startedAt)); + } + long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner(); List<CompactionInfo> readyToClean = txnHandler.findReadyToClean(minOpenTxnId, retentionTime); @@ -145,6 +153,10 @@ public class Cleaner extends MetaStoreCompactorThread { if (handle != null) { handle.releaseLocks(); } + if (metricsEnabled) { + updateCycleDurationMetric(MetricsConstants.COMPACTION_INITIATOR_CYCLE_DURATION, startedAt); + } + stopCycleUpdater(); } // Now, go back to bed until it's time to do this again long elapsedTime = System.currentTimeMillis() - startedAt; @@ -204,6 +216,9 @@ public class Cleaner extends MetaStoreCompactorThread { return; } } + + txnHandler.markCleanerStart(ci); + StorageDescriptor sd = resolveStorageDescriptor(t, p); final String location = sd.getLocation(); ValidTxnList validTxnList = @@ -266,6 +281,7 @@ public class Cleaner extends MetaStoreCompactorThread { if (removedFiles.value || isDynPartAbort(t, ci)) { txnHandler.markCleaned(ci); } else { + txnHandler.clearCleanerStart(ci); LOG.warn("No files were removed. Leaving queue entry " + ci + " in ready for cleaning state."); } } catch (Exception e) { @@ -371,4 +387,19 @@ public class Cleaner extends MetaStoreCompactorThread { } return true; } + + private static class CleanerCycleUpdater implements Runnable { + private final String metric; + private final long startedAt; + + CleanerCycleUpdater(String metric, long startedAt) { + this.metric = metric; + this.startedAt = startedAt; + } + + @Override + public void run() { + updateCycleDurationMetric(metric, startedAt); + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index adebd31..bffa773 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -121,13 +121,24 @@ public class Initiator extends MetaStoreCompactorThread { // don't doom the entire thread. try { handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Initiator.name()); + startedAt = System.currentTimeMillis(); + long compactionInterval = (prevStart < 0) ? prevStart : (startedAt - prevStart) / 1000; + prevStart = startedAt; + if (metricsEnabled) { perfLogger.perfLogBegin(CLASS_NAME, MetricsConstants.COMPACTION_INITIATOR_CYCLE); + stopCycleUpdater(); + startCycleUpdater(HiveConf.getTimeVar(conf, + HiveConf.ConfVars.HIVE_COMPACTOR_INITIATOR_DURATION_UPDATE_INTERVAL, TimeUnit.MILLISECONDS), + new InitiatorCycleUpdater(MetricsConstants.COMPACTION_INITIATOR_CYCLE_DURATION, + startedAt, + MetastoreConf.getTimeVar(conf, + MetastoreConf.ConfVars.COMPACTOR_LONG_RUNNING_INITIATOR_THRESHOLD_WARNING, + TimeUnit.MILLISECONDS), + MetastoreConf.getTimeVar(conf, + MetastoreConf.ConfVars.COMPACTOR_LONG_RUNNING_INITIATOR_THRESHOLD_ERROR, + TimeUnit.MILLISECONDS))); } - startedAt = System.currentTimeMillis(); - - long compactionInterval = (prevStart < 0) ? prevStart : (startedAt - prevStart)/1000; - prevStart = startedAt; final ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest()); @@ -191,7 +202,9 @@ public class Initiator extends MetaStoreCompactorThread { } if (metricsEnabled) { perfLogger.perfLogEnd(CLASS_NAME, MetricsConstants.COMPACTION_INITIATOR_CYCLE); + updateCycleDurationMetric(MetricsConstants.COMPACTION_INITIATOR_CYCLE_DURATION, startedAt); } + stopCycleUpdater(); } long elapsedTime = System.currentTimeMillis() - startedAt; @@ -558,4 +571,38 @@ public class Initiator extends MetaStoreCompactorThread { name.append(threadId); return name.toString(); } + + private static class InitiatorCycleUpdater implements Runnable { + private final String metric; + private final long startedAt; + private final long warningThreshold; + private final long errorThreshold; + + private boolean errorReported; + private boolean warningReported; + + InitiatorCycleUpdater(String metric, long startedAt, + long warningThreshold, long errorThreshold) { + this.metric = metric; + this.startedAt = startedAt; + this.warningThreshold = warningThreshold; + this.errorThreshold = errorThreshold; + } + + @Override + public void run() { + long elapsed = updateCycleDurationMetric(metric, startedAt); + if (elapsed >= errorThreshold) { + if (!errorReported) { + LOG.error("Long running Initiator has been detected, duration {}", elapsed); + errorReported = true; + } + } else if (elapsed >= warningThreshold) { + if (!warningReported && !errorReported) { + warningReported = true; + LOG.warn("Long running Initiator has been detected, duration {}", elapsed); + } + } + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java index 7bf1304..4184caf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java @@ -17,12 +17,14 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.hive.metastore.MetaStoreThread; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.metrics.Metrics; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; @@ -31,6 +33,9 @@ import org.apache.thrift.TException; import java.util.Collections; import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.hadoop.hive.metastore.HMSHandler.getMSForConf; @@ -44,6 +49,7 @@ public class MetaStoreCompactorThread extends CompactorThread implements MetaSto protected TxnStore txnHandler; protected int threadId; + protected ScheduledExecutorService cycleUpdaterExecutorService; @Override public void setThreadId(int threadId) { @@ -94,4 +100,37 @@ public class MetaStoreCompactorThread extends CompactorThread implements MetaSto throw new MetaException(e.toString()); } } + + protected void startCycleUpdater(long updateInterval, Runnable taskToRun) { + if (cycleUpdaterExecutorService == null) { + if (updateInterval > 0) { + cycleUpdaterExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder() + .setPriority(Thread.currentThread().getPriority()) + .setDaemon(true) + .setNameFormat("Cycle-Duration-Updater-%d") + .build()); + cycleUpdaterExecutorService.scheduleAtFixedRate( + taskToRun, + updateInterval, updateInterval, TimeUnit.MILLISECONDS); + } + } + } + + protected void stopCycleUpdater() { + if (cycleUpdaterExecutorService != null) { + cycleUpdaterExecutorService.shutdownNow(); + cycleUpdaterExecutorService = null; + } + } + + protected static long updateCycleDurationMetric(String metric, long startedAt) { + if (startedAt >= 0) { + long elapsed = System.currentTimeMillis() - startedAt; + LOG.debug("Updating {} metric to {}", metric, elapsed); + Metrics.getOrCreateGauge(metric) + .set((int)elapsed); + return elapsed; + } + return 0; + } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java index 13c97fb..75c722b 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.txn.compactor; import com.codahale.metrics.Counter; +import com.google.common.collect.ImmutableList; import org.apache.hadoop.hive.common.ServerUtils; import org.apache.hadoop.hive.common.metrics.MetricsTestUtils; import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; @@ -434,7 +435,7 @@ public class TestCompactionMetrics extends CompactorTest { elements.add(generateElement(4,"db", "tb3", "p1", CompactionType.MINOR, TxnStore.FAILED_RESPONSE)); elements.add(generateElement(6,"db1", "tb", null, CompactionType.MINOR, TxnStore.FAILED_RESPONSE, - System.currentTimeMillis(), true, "4.0.0", "4.0.0")); + System.currentTimeMillis(), true, "4.0.0", "4.0.0", 10)); elements.add(generateElement(7,"db1", "tb2", null, CompactionType.MINOR, TxnStore.FAILED_RESPONSE)); elements.add(generateElement(8,"db1", "tb3", null, CompactionType.MINOR, TxnStore.FAILED_RESPONSE)); @@ -446,12 +447,12 @@ public class TestCompactionMetrics extends CompactorTest { elements.add(generateElement(13,"db3", "tb3", null, CompactionType.MINOR, TxnStore.WORKING_RESPONSE)); // test null initiator version and worker version elements.add(generateElement(14,"db3", "tb4", null, CompactionType.MINOR, TxnStore.WORKING_RESPONSE, - System.currentTimeMillis(), false, null, null)); + System.currentTimeMillis(), false, null, null,20)); elements.add(generateElement(15,"db3", "tb5", null, CompactionType.MINOR, TxnStore.WORKING_RESPONSE, - System.currentTimeMillis(),true, "4.0.0", "4.0.0")); + System.currentTimeMillis(),true, "4.0.0", "4.0.0", 30)); elements.add(generateElement(16,"db3", "tb6", null, CompactionType.MINOR, TxnStore.WORKING_RESPONSE)); elements.add(generateElement(17,"db3", "tb7", null, CompactionType.MINOR, TxnStore.WORKING_RESPONSE, - System.currentTimeMillis(),true, "4.0.0", "4.0.0")); + System.currentTimeMillis(),true, "4.0.0", "4.0.0",40)); scr.setCompacts(elements); AcidMetricService.updateMetricsFromShowCompact(scr, conf); @@ -483,44 +484,91 @@ public class TestCompactionMetrics extends CompactorTest { @Test public void testAgeMetricsNotSet() { - ShowCompactResponse scr = new ShowCompactResponse(); - List<ShowCompactResponseElement> elements = new ArrayList<>(); - elements.add(generateElement(1, "db", "tb", null, CompactionType.MAJOR, TxnStore.FAILED_RESPONSE, 1L)); - elements.add(generateElement(5, "db", "tb3", "p1", CompactionType.MINOR, TxnStore.DID_NOT_INITIATE_RESPONSE, 2L)); - elements.add(generateElement(9, "db2", "tb", null, CompactionType.MINOR, TxnStore.SUCCEEDED_RESPONSE, 3L)); - elements.add(generateElement(13, "db3", "tb3", null, CompactionType.MINOR, TxnStore.WORKING_RESPONSE, 4L)); - elements.add(generateElement(14, "db3", "tb4", null, CompactionType.MINOR, TxnStore.CLEANING_RESPONSE, 5L)); + List<ShowCompactResponseElement> elements = ImmutableList.of( + generateElement(1, "db", "tb", null, CompactionType.MAJOR, TxnStore.FAILED_RESPONSE, 1L), + generateElement(5, "db", "tb3", "p1", CompactionType.MINOR, TxnStore.DID_NOT_INITIATE_RESPONSE, 2L), + generateElement(9, "db2", "tb", null, CompactionType.MINOR, TxnStore.SUCCEEDED_RESPONSE, 3L) + ); + ShowCompactResponse scr = new ShowCompactResponse(); scr.setCompacts(elements); AcidMetricService.updateMetricsFromShowCompact(scr, conf); + // Check that it is not set Assert.assertEquals(0, Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_ENQUEUE_AGE).intValue()); + Assert.assertEquals(0, Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_WORKING_AGE).intValue()); + Assert.assertEquals(0, Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_CLEANING_AGE).intValue()); } @Test - public void testAgeMetricsAge() { + public void testInitiatedAgeMetrics() { ShowCompactResponse scr = new ShowCompactResponse(); - List<ShowCompactResponseElement> elements = new ArrayList<>(); long start = System.currentTimeMillis() - 1000L; - elements.add(generateElement(15,"db3", "tb5", null, CompactionType.MINOR, TxnStore.INITIATED_RESPONSE, start)); + List<ShowCompactResponseElement> elements = ImmutableList.of( + generateElement(15, "db3", "tb5", null, CompactionType.MINOR, TxnStore.INITIATED_RESPONSE, start) + ); scr.setCompacts(elements); AcidMetricService.updateMetricsFromShowCompact(scr, conf); - long diff = (System.currentTimeMillis() - start)/1000; + long diff = (System.currentTimeMillis() - start) / 1000; + // Check that we have at least 1s old compaction age, but not more than expected - Assert.assertTrue(Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_ENQUEUE_AGE).intValue() <= diff); - Assert.assertTrue(Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_ENQUEUE_AGE).intValue() >= 1); + int age = Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_ENQUEUE_AGE).intValue(); + Assert.assertTrue(age <= diff); + Assert.assertTrue(age >= 1); } @Test - public void testAgeMetricsOrder() { + public void testWorkingAgeMetrics() { + ShowCompactResponse scr = new ShowCompactResponse(); + + long start = System.currentTimeMillis() - 1000L; + List<ShowCompactResponseElement> elements = ImmutableList.of( + generateElement(17, "db3", "tb7", null, CompactionType.MINOR, TxnStore.WORKING_RESPONSE, + System.currentTimeMillis(), true, "4.0.0", "4.0.0", start) + ); + + scr.setCompacts(elements); + AcidMetricService.updateMetricsFromShowCompact(scr, conf); + long diff = (System.currentTimeMillis() - start) / 1000; + + // Check that we have at least 1s old compaction age, but not more than expected + int age = Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_WORKING_AGE).intValue(); + Assert.assertTrue(age <= diff); + Assert.assertTrue(age >= 1); + } + + @Test + public void testCleaningAgeMetrics() { + ShowCompactResponse scr = new ShowCompactResponse(); + + long start = System.currentTimeMillis() - 1000L; + List<ShowCompactResponseElement> elements = ImmutableList.of( + generateElement(19, "db3", "tb7", null, CompactionType.MINOR, TxnStore.CLEANING_RESPONSE, + System.currentTimeMillis(), true, "4.0.0", "4.0.0", -1L, start) + ); + + scr.setCompacts(elements); + AcidMetricService.updateMetricsFromShowCompact(scr, conf); + long diff = (System.currentTimeMillis() - start) / 1000; + + // Check that we have at least 1s old compaction age, but not more than expected + int age = Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_CLEANING_AGE).intValue(); + Assert.assertTrue(age <= diff); + Assert.assertTrue(age >= 1); + } + + @Test + public void testInitiatedAgeMetricsOrder() { ShowCompactResponse scr = new ShowCompactResponse(); long start = System.currentTimeMillis(); - List<ShowCompactResponseElement> elements = new ArrayList<>(); - elements.add(generateElement(15,"db3", "tb5", null, CompactionType.MINOR, TxnStore.INITIATED_RESPONSE, - start - 1000L)); - elements.add(generateElement(16,"db3", "tb6", null, CompactionType.MINOR, TxnStore.INITIATED_RESPONSE, - start - 100000L)); + + List<ShowCompactResponseElement> elements = ImmutableList.of( + generateElement(15, "db3", "tb5", null, CompactionType.MINOR, TxnStore.INITIATED_RESPONSE, + start - 1_000L), + generateElement(16, "db3", "tb6", null, CompactionType.MINOR, TxnStore.INITIATED_RESPONSE, + start - 15_000L) + ); scr.setCompacts(elements); AcidMetricService.updateMetricsFromShowCompact(scr, conf); @@ -528,14 +576,79 @@ public class TestCompactionMetrics extends CompactorTest { Assert.assertTrue(Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_ENQUEUE_AGE).intValue() > 10); // Check the reverse order - elements = new ArrayList<>(); - elements.add(generateElement(16,"db3", "tb6", null, CompactionType.MINOR, TxnStore.INITIATED_RESPONSE, - start - 100000L)); - elements.add(generateElement(15,"db3", "tb5", null, CompactionType.MINOR, TxnStore.INITIATED_RESPONSE, - start - 1000L)); + elements = ImmutableList.of( + generateElement(16, "db3", "tb6", null, CompactionType.MINOR, TxnStore.INITIATED_RESPONSE, + start - 25_000L), + generateElement(15, "db3", "tb5", null, CompactionType.MINOR, TxnStore.INITIATED_RESPONSE, + start - 1_000L) + ); + scr.setCompacts(elements); + AcidMetricService.updateMetricsFromShowCompact(scr, conf); + + // Check that the age is older than 20s + Assert.assertTrue(Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_ENQUEUE_AGE).intValue() > 20); + } + + @Test + public void testWorkingAgeMetricsOrder() { + ShowCompactResponse scr = new ShowCompactResponse(); + long start = System.currentTimeMillis(); + + List<ShowCompactResponseElement> elements = ImmutableList.of( + generateElement(15, "db3", "tb5", null, CompactionType.MINOR, TxnStore.WORKING_RESPONSE, + start, false, "4.0.0", "4.0.0", start - 1_000L), + generateElement(16, "db3", "tb6", null, CompactionType.MINOR, TxnStore.WORKING_RESPONSE, + start, false, "4.0.0", "4.0.0", start - 15_000L) + ); + + scr.setCompacts(elements); + AcidMetricService.updateMetricsFromShowCompact(scr, conf); + // Check that the age is older than 10s + Assert.assertTrue(Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_WORKING_AGE).intValue() > 10); + + // Check the reverse order + elements = ImmutableList.of( + generateElement(16, "db3", "tb6", null, CompactionType.MINOR, TxnStore.WORKING_RESPONSE, + start, false, "4.0.0", "4.0.0", start - 25_000L), + generateElement(15, "db3", "tb5", null, CompactionType.MINOR, TxnStore.WORKING_RESPONSE, + start, false, "4.0.0", "4.0.0", start - 1_000L) + ); + scr.setCompacts(elements); + AcidMetricService.updateMetricsFromShowCompact(scr, conf); + + // Check that the age is older than 20s + Assert.assertTrue(Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_WORKING_AGE).intValue() > 20); + } + @Test + public void testCleaningAgeMetricsOrder() { + ShowCompactResponse scr = new ShowCompactResponse(); + long start = System.currentTimeMillis(); + + List<ShowCompactResponseElement> elements = ImmutableList.of( + generateElement(15, "db3", "tb5", null, CompactionType.MINOR, TxnStore.CLEANING_RESPONSE, + start, false, "4.0.0", "4.0.0", -1L, start - 1_000L), + generateElement(16, "db3", "tb6", null, CompactionType.MINOR, TxnStore.CLEANING_RESPONSE, + start, false, "4.0.0", "4.0.0", -1L, start - 15_000L) + ); + + scr.setCompacts(elements); + AcidMetricService.updateMetricsFromShowCompact(scr, conf); // Check that the age is older than 10s - Assert.assertTrue(Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_ENQUEUE_AGE).intValue() > 10); + Assert.assertTrue(Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_CLEANING_AGE).intValue() > 10); + + // Check the reverse order + elements = ImmutableList.of( + generateElement(16, "db3", "tb6", null, CompactionType.MINOR, TxnStore.CLEANING_RESPONSE, + start, false, "4.0.0", "4.0.0", -1L, start - 25_000L), + generateElement(15, "db3", "tb5", null, CompactionType.MINOR, TxnStore.CLEANING_RESPONSE, + start, false, "4.0.0", "4.0.0", -1L, start - 1_000L) + ); + scr.setCompacts(elements); + AcidMetricService.updateMetricsFromShowCompact(scr, conf); + + // Check that the age is older than 20s + Assert.assertTrue(Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_CLEANING_AGE).intValue() > 20); } @Test @@ -741,12 +854,19 @@ public class TestCompactionMetrics extends CompactorTest { private ShowCompactResponseElement generateElement(long id, String db, String table, String partition, CompactionType type, String state, long enqueueTime, boolean manuallyInitiatedCompaction) { return generateElement(id, db, table, partition, type, state, enqueueTime, manuallyInitiatedCompaction, - null, null); + null, null, -1); + } + + private ShowCompactResponseElement generateElement(long id, String db, String table, String partition, + CompactionType type, String state, long enqueueTime, boolean manuallyInitiatedCompaction, + String initiatorVersion, String workerVersion, long startTime) { + return generateElement(id, db, table, partition, type, state, enqueueTime, manuallyInitiatedCompaction, + initiatorVersion, workerVersion, startTime, -1L); } private ShowCompactResponseElement generateElement(long id, String db, String table, String partition, CompactionType type, String state, long enqueueTime, boolean manuallyInitiatedCompaction, - String initiatorVersion, String workerVersion) { + String initiatorVersion, String workerVersion, long startTime, long cleanerStartTime) { ShowCompactResponseElement element = new ShowCompactResponseElement(db, table, type, state); element.setId(id); element.setPartitionname(partition); @@ -764,6 +884,8 @@ public class TestCompactionMetrics extends CompactorTest { element.setWorkerid(workerId); element.setInitiatorVersion(initiatorVersion); element.setWorkerVersion(workerVersion); + element.setStart(startTime); + element.setCleanerStart(cleanerStartTime); return element; } diff --git a/ql/src/test/results/clientpositive/llap/sysdb.q.out b/ql/src/test/results/clientpositive/llap/sysdb.q.out index da1cf94..80b6b49 100644 --- a/ql/src/test/results/clientpositive/llap/sysdb.q.out +++ b/ql/src/test/results/clientpositive/llap/sysdb.q.out @@ -465,6 +465,7 @@ columns_v2 column_name columns_v2 comment columns_v2 integer_idx columns_v2 type_name +compaction_queue cq_cleaner_start compaction_queue cq_database compaction_queue cq_enqueue_time compaction_queue cq_error_message @@ -484,6 +485,8 @@ compaction_queue cq_worker_id compaction_queue cq_worker_version compactions c_catalog compactions c_catalog +compactions c_cleaner_start +compactions c_cleaner_start compactions c_database compactions c_database compactions c_duration @@ -1552,8 +1555,8 @@ POSTHOOK: Input: sys@compaction_queue POSTHOOK: Input: sys@compactions POSTHOOK: Input: sys@completed_compactions #### A masked pattern was here #### -1 default default scr_txn NULL major initiated NULL NULL NULL #Masked# NULL NULL NULL NULL NULL #Masked# manual 4.0.0-SNAPSHOT -2 default default scr_txn_2 NULL minor initiated NULL NULL NULL #Masked# NULL NULL NULL NULL NULL #Masked# manual 4.0.0-SNAPSHOT +1 default default scr_txn NULL major initiated NULL NULL NULL #Masked# NULL NULL NULL NULL NULL #Masked# manual 4.0.0-SNAPSHOT NULL +2 default default scr_txn_2 NULL minor initiated NULL NULL NULL #Masked# NULL NULL NULL NULL NULL #Masked# manual 4.0.0-SNAPSHOT NULL PREHOOK: query: use INFORMATION_SCHEMA PREHOOK: type: SWITCHDATABASE PREHOOK: Input: database:information_schema @@ -1781,5 +1784,5 @@ POSTHOOK: Input: sys@dbs POSTHOOK: Input: sys@tbl_privs POSTHOOK: Input: sys@tbls #### A masked pattern was here #### -1 default default scr_txn NULL major initiated NULL NULL NULL #Masked# NULL NULL NULL NULL NULL #Masked# manual 4.0.0-SNAPSHOT -2 default default scr_txn_2 NULL minor initiated NULL NULL NULL #Masked# NULL NULL NULL NULL NULL #Masked# manual 4.0.0-SNAPSHOT +1 default default scr_txn NULL major initiated NULL NULL NULL #Masked# NULL NULL NULL NULL NULL #Masked# manual 4.0.0-SNAPSHOT NULL +2 default default scr_txn_2 NULL minor initiated NULL NULL NULL #Masked# NULL NULL NULL NULL NULL #Masked# manual 4.0.0-SNAPSHOT NULL diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp index 0b50045..71e30ab 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp @@ -27547,6 +27547,11 @@ void ShowCompactResponseElement::__set_initiatorVersion(const std::string& val) this->initiatorVersion = val; __isset.initiatorVersion = true; } + +void ShowCompactResponseElement::__set_cleanerStart(const int64_t val) { + this->cleanerStart = val; +__isset.cleanerStart = true; +} std::ostream& operator<<(std::ostream& out, const ShowCompactResponseElement& obj) { obj.printTo(out); @@ -27725,6 +27730,14 @@ uint32_t ShowCompactResponseElement::read(::apache::thrift::protocol::TProtocol* xfer += iprot->skip(ftype); } break; + case 19: + if (ftype == ::apache::thrift::protocol::T_I64) { + xfer += iprot->readI64(this->cleanerStart); + this->__isset.cleanerStart = true; + } else { + xfer += iprot->skip(ftype); + } + break; default: xfer += iprot->skip(ftype); break; @@ -27836,6 +27849,11 @@ uint32_t ShowCompactResponseElement::write(::apache::thrift::protocol::TProtocol xfer += oprot->writeString(this->initiatorVersion); xfer += oprot->writeFieldEnd(); } + if (this->__isset.cleanerStart) { + xfer += oprot->writeFieldBegin("cleanerStart", ::apache::thrift::protocol::T_I64, 19); + xfer += oprot->writeI64(this->cleanerStart); + xfer += oprot->writeFieldEnd(); + } xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; @@ -27861,6 +27879,7 @@ void swap(ShowCompactResponseElement &a, ShowCompactResponseElement &b) { swap(a.workerVersion, b.workerVersion); swap(a.initiatorId, b.initiatorId); swap(a.initiatorVersion, b.initiatorVersion); + swap(a.cleanerStart, b.cleanerStart); swap(a.__isset, b.__isset); } @@ -27883,6 +27902,7 @@ ShowCompactResponseElement::ShowCompactResponseElement(const ShowCompactResponse workerVersion = other984.workerVersion; initiatorId = other984.initiatorId; initiatorVersion = other984.initiatorVersion; + cleanerStart = other984.cleanerStart; __isset = other984.__isset; } ShowCompactResponseElement& ShowCompactResponseElement::operator=(const ShowCompactResponseElement& other985) { @@ -27904,6 +27924,7 @@ ShowCompactResponseElement& ShowCompactResponseElement::operator=(const ShowComp workerVersion = other985.workerVersion; initiatorId = other985.initiatorId; initiatorVersion = other985.initiatorVersion; + cleanerStart = other985.cleanerStart; __isset = other985.__isset; return *this; } @@ -27928,6 +27949,7 @@ void ShowCompactResponseElement::printTo(std::ostream& out) const { out << ", " << "workerVersion="; (__isset.workerVersion ? (out << to_string(workerVersion)) : (out << "<null>")); out << ", " << "initiatorId="; (__isset.initiatorId ? (out << to_string(initiatorId)) : (out << "<null>")); out << ", " << "initiatorVersion="; (__isset.initiatorVersion ? (out << to_string(initiatorVersion)) : (out << "<null>")); + out << ", " << "cleanerStart="; (__isset.cleanerStart ? (out << to_string(cleanerStart)) : (out << "<null>")); out << ")"; } diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h index 3e5eb35..1d4d544 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h @@ -10407,7 +10407,7 @@ void swap(ShowCompactRequest &a, ShowCompactRequest &b); std::ostream& operator<<(std::ostream& out, const ShowCompactRequest& obj); typedef struct _ShowCompactResponseElement__isset { - _ShowCompactResponseElement__isset() : partitionname(false), workerid(false), start(false), runAs(false), hightestTxnId(false), metaInfo(false), endTime(false), hadoopJobId(true), id(false), errorMessage(false), enqueueTime(false), workerVersion(false), initiatorId(false), initiatorVersion(false) {} + _ShowCompactResponseElement__isset() : partitionname(false), workerid(false), start(false), runAs(false), hightestTxnId(false), metaInfo(false), endTime(false), hadoopJobId(true), id(false), errorMessage(false), enqueueTime(false), workerVersion(false), initiatorId(false), initiatorVersion(false), cleanerStart(false) {} bool partitionname :1; bool workerid :1; bool start :1; @@ -10422,6 +10422,7 @@ typedef struct _ShowCompactResponseElement__isset { bool workerVersion :1; bool initiatorId :1; bool initiatorVersion :1; + bool cleanerStart :1; } _ShowCompactResponseElement__isset; class ShowCompactResponseElement : public virtual ::apache::thrift::TBase { @@ -10429,7 +10430,7 @@ class ShowCompactResponseElement : public virtual ::apache::thrift::TBase { ShowCompactResponseElement(const ShowCompactResponseElement&); ShowCompactResponseElement& operator=(const ShowCompactResponseElement&); - ShowCompactResponseElement() : dbname(), tablename(), partitionname(), type((CompactionType::type)0), state(), workerid(), start(0), runAs(), hightestTxnId(0), metaInfo(), endTime(0), hadoopJobId("None"), id(0), errorMessage(), enqueueTime(0), workerVersion(), initiatorId(), initiatorVersion() { + ShowCompactResponseElement() : dbname(), tablename(), partitionname(), type((CompactionType::type)0), state(), workerid(), start(0), runAs(), hightestTxnId(0), metaInfo(), endTime(0), hadoopJobId("None"), id(0), errorMessage(), enqueueTime(0), workerVersion(), initiatorId(), initiatorVersion(), cleanerStart(0) { } virtual ~ShowCompactResponseElement() noexcept; @@ -10455,6 +10456,7 @@ class ShowCompactResponseElement : public virtual ::apache::thrift::TBase { std::string workerVersion; std::string initiatorId; std::string initiatorVersion; + int64_t cleanerStart; _ShowCompactResponseElement__isset __isset; @@ -10494,6 +10496,8 @@ class ShowCompactResponseElement : public virtual ::apache::thrift::TBase { void __set_initiatorVersion(const std::string& val); + void __set_cleanerStart(const int64_t val); + bool operator == (const ShowCompactResponseElement & rhs) const { if (!(dbname == rhs.dbname)) @@ -10560,6 +10564,10 @@ class ShowCompactResponseElement : public virtual ::apache::thrift::TBase { return false; else if (__isset.initiatorVersion && !(initiatorVersion == rhs.initiatorVersion)) return false; + if (__isset.cleanerStart != rhs.__isset.cleanerStart) + return false; + else if (__isset.cleanerStart && !(cleanerStart == rhs.cleanerStart)) + return false; return true; } bool operator != (const ShowCompactResponseElement &rhs) const { diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ShowCompactResponseElement.java b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ShowCompactResponseElement.java index dff37fe..d0245ea 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ShowCompactResponseElement.java +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ShowCompactResponseElement.java @@ -29,6 +29,7 @@ package org.apache.hadoop.hive.metastore.api; private static final org.apache.thrift.protocol.TField WORKER_VERSION_FIELD_DESC = new org.apache.thrift.protocol.TField("workerVersion", org.apache.thrift.protocol.TType.STRING, (short)16); private static final org.apache.thrift.protocol.TField INITIATOR_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("initiatorId", org.apache.thrift.protocol.TType.STRING, (short)17); private static final org.apache.thrift.protocol.TField INITIATOR_VERSION_FIELD_DESC = new org.apache.thrift.protocol.TField("initiatorVersion", org.apache.thrift.protocol.TType.STRING, (short)18); + private static final org.apache.thrift.protocol.TField CLEANER_START_FIELD_DESC = new org.apache.thrift.protocol.TField("cleanerStart", org.apache.thrift.protocol.TType.I64, (short)19); private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new ShowCompactResponseElementStandardSchemeFactory(); private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new ShowCompactResponseElementTupleSchemeFactory(); @@ -51,6 +52,7 @@ package org.apache.hadoop.hive.metastore.api; private @org.apache.thrift.annotation.Nullable java.lang.String workerVersion; // optional private @org.apache.thrift.annotation.Nullable java.lang.String initiatorId; // optional private @org.apache.thrift.annotation.Nullable java.lang.String initiatorVersion; // optional + private long cleanerStart; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { @@ -75,7 +77,8 @@ package org.apache.hadoop.hive.metastore.api; ENQUEUE_TIME((short)15, "enqueueTime"), WORKER_VERSION((short)16, "workerVersion"), INITIATOR_ID((short)17, "initiatorId"), - INITIATOR_VERSION((short)18, "initiatorVersion"); + INITIATOR_VERSION((short)18, "initiatorVersion"), + CLEANER_START((short)19, "cleanerStart"); private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>(); @@ -127,6 +130,8 @@ package org.apache.hadoop.hive.metastore.api; return INITIATOR_ID; case 18: // INITIATOR_VERSION return INITIATOR_VERSION; + case 19: // CLEANER_START + return CLEANER_START; default: return null; } @@ -173,8 +178,9 @@ package org.apache.hadoop.hive.metastore.api; private static final int __ENDTIME_ISSET_ID = 2; private static final int __ID_ISSET_ID = 3; private static final int __ENQUEUETIME_ISSET_ID = 4; + private static final int __CLEANERSTART_ISSET_ID = 5; private byte __isset_bitfield = 0; - private static final _Fields optionals[] = {_Fields.PARTITIONNAME,_Fields.WORKERID,_Fields.START,_Fields.RUN_AS,_Fields.HIGHTEST_TXN_ID,_Fields.META_INFO,_Fields.END_TIME,_Fields.HADOOP_JOB_ID,_Fields.ID,_Fields.ERROR_MESSAGE,_Fields.ENQUEUE_TIME,_Fields.WORKER_VERSION,_Fields.INITIATOR_ID,_Fields.INITIATOR_VERSION}; + private static final _Fields optionals[] = {_Fields.PARTITIONNAME,_Fields.WORKERID,_Fields.START,_Fields.RUN_AS,_Fields.HIGHTEST_TXN_ID,_Fields.META_INFO,_Fields.END_TIME,_Fields.HADOOP_JOB_ID,_Fields.ID,_Fields.ERROR_MESSAGE,_Fields.ENQUEUE_TIME,_Fields.WORKER_VERSION,_Fields.INITIATOR_ID,_Fields.INITIATOR_VERSION,_Fields.CLEANER_START}; public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -214,6 +220,8 @@ package org.apache.hadoop.hive.metastore.api; new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); tmpMap.put(_Fields.INITIATOR_VERSION, new org.apache.thrift.meta_data.FieldMetaData("initiatorVersion", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + tmpMap.put(_Fields.CLEANER_START, new org.apache.thrift.meta_data.FieldMetaData("cleanerStart", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(ShowCompactResponseElement.class, metaDataMap); } @@ -285,6 +293,7 @@ package org.apache.hadoop.hive.metastore.api; if (other.isSetInitiatorVersion()) { this.initiatorVersion = other.initiatorVersion; } + this.cleanerStart = other.cleanerStart; } public ShowCompactResponseElement deepCopy() { @@ -317,6 +326,8 @@ package org.apache.hadoop.hive.metastore.api; this.workerVersion = null; this.initiatorId = null; this.initiatorVersion = null; + setCleanerStartIsSet(false); + this.cleanerStart = 0; } @org.apache.thrift.annotation.Nullable @@ -749,6 +760,28 @@ package org.apache.hadoop.hive.metastore.api; } } + public long getCleanerStart() { + return this.cleanerStart; + } + + public void setCleanerStart(long cleanerStart) { + this.cleanerStart = cleanerStart; + setCleanerStartIsSet(true); + } + + public void unsetCleanerStart() { + __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __CLEANERSTART_ISSET_ID); + } + + /** Returns true if field cleanerStart is set (has been assigned a value) and false otherwise */ + public boolean isSetCleanerStart() { + return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __CLEANERSTART_ISSET_ID); + } + + public void setCleanerStartIsSet(boolean value) { + __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __CLEANERSTART_ISSET_ID, value); + } + public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) { switch (field) { case DBNAME: @@ -895,6 +928,14 @@ package org.apache.hadoop.hive.metastore.api; } break; + case CLEANER_START: + if (value == null) { + unsetCleanerStart(); + } else { + setCleanerStart((java.lang.Long)value); + } + break; + } } @@ -955,6 +996,9 @@ package org.apache.hadoop.hive.metastore.api; case INITIATOR_VERSION: return getInitiatorVersion(); + case CLEANER_START: + return getCleanerStart(); + } throw new java.lang.IllegalStateException(); } @@ -1002,6 +1046,8 @@ package org.apache.hadoop.hive.metastore.api; return isSetInitiatorId(); case INITIATOR_VERSION: return isSetInitiatorVersion(); + case CLEANER_START: + return isSetCleanerStart(); } throw new java.lang.IllegalStateException(); } @@ -1181,6 +1227,15 @@ package org.apache.hadoop.hive.metastore.api; return false; } + boolean this_present_cleanerStart = true && this.isSetCleanerStart(); + boolean that_present_cleanerStart = true && that.isSetCleanerStart(); + if (this_present_cleanerStart || that_present_cleanerStart) { + if (!(this_present_cleanerStart && that_present_cleanerStart)) + return false; + if (this.cleanerStart != that.cleanerStart) + return false; + } + return true; } @@ -1260,6 +1315,10 @@ package org.apache.hadoop.hive.metastore.api; if (isSetInitiatorVersion()) hashCode = hashCode * 8191 + initiatorVersion.hashCode(); + hashCode = hashCode * 8191 + ((isSetCleanerStart()) ? 131071 : 524287); + if (isSetCleanerStart()) + hashCode = hashCode * 8191 + org.apache.thrift.TBaseHelper.hashCode(cleanerStart); + return hashCode; } @@ -1451,6 +1510,16 @@ package org.apache.hadoop.hive.metastore.api; return lastComparison; } } + lastComparison = java.lang.Boolean.compare(isSetCleanerStart(), other.isSetCleanerStart()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetCleanerStart()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.cleanerStart, other.cleanerStart); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -1623,6 +1692,12 @@ package org.apache.hadoop.hive.metastore.api; } first = false; } + if (isSetCleanerStart()) { + if (!first) sb.append(", "); + sb.append("cleanerStart:"); + sb.append(this.cleanerStart); + first = false; + } sb.append(")"); return sb.toString(); } @@ -1828,6 +1903,14 @@ package org.apache.hadoop.hive.metastore.api; org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 19: // CLEANER_START + if (schemeField.type == org.apache.thrift.protocol.TType.I64) { + struct.cleanerStart = iprot.readI64(); + struct.setCleanerStartIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -1949,6 +2032,11 @@ package org.apache.hadoop.hive.metastore.api; oprot.writeFieldEnd(); } } + if (struct.isSetCleanerStart()) { + oprot.writeFieldBegin(CLEANER_START_FIELD_DESC); + oprot.writeI64(struct.cleanerStart); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -2013,7 +2101,10 @@ package org.apache.hadoop.hive.metastore.api; if (struct.isSetInitiatorVersion()) { optionals.set(13); } - oprot.writeBitSet(optionals, 14); + if (struct.isSetCleanerStart()) { + optionals.set(14); + } + oprot.writeBitSet(optionals, 15); if (struct.isSetPartitionname()) { oprot.writeString(struct.partitionname); } @@ -2056,6 +2147,9 @@ package org.apache.hadoop.hive.metastore.api; if (struct.isSetInitiatorVersion()) { oprot.writeString(struct.initiatorVersion); } + if (struct.isSetCleanerStart()) { + oprot.writeI64(struct.cleanerStart); + } } @Override @@ -2069,7 +2163,7 @@ package org.apache.hadoop.hive.metastore.api; struct.setTypeIsSet(true); struct.state = iprot.readString(); struct.setStateIsSet(true); - java.util.BitSet incoming = iprot.readBitSet(14); + java.util.BitSet incoming = iprot.readBitSet(15); if (incoming.get(0)) { struct.partitionname = iprot.readString(); struct.setPartitionnameIsSet(true); @@ -2126,6 +2220,10 @@ package org.apache.hadoop.hive.metastore.api; struct.initiatorVersion = iprot.readString(); struct.setInitiatorVersionIsSet(true); } + if (incoming.get(14)) { + struct.cleanerStart = iprot.readI64(); + struct.setCleanerStartIsSet(true); + } } } diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/ShowCompactResponseElement.php b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/ShowCompactResponseElement.php index a05ccf5..a66b43f 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/ShowCompactResponseElement.php +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/ShowCompactResponseElement.php @@ -112,6 +112,11 @@ class ShowCompactResponseElement 'isRequired' => false, 'type' => TType::STRING, ), + 19 => array( + 'var' => 'cleanerStart', + 'isRequired' => false, + 'type' => TType::I64, + ), ); /** @@ -186,6 +191,10 @@ class ShowCompactResponseElement * @var string */ public $initiatorVersion = null; + /** + * @var int + */ + public $cleanerStart = null; public function __construct($vals = null) { @@ -244,6 +253,9 @@ class ShowCompactResponseElement if (isset($vals['initiatorVersion'])) { $this->initiatorVersion = $vals['initiatorVersion']; } + if (isset($vals['cleanerStart'])) { + $this->cleanerStart = $vals['cleanerStart']; + } } } @@ -392,6 +404,13 @@ class ShowCompactResponseElement $xfer += $input->skip($ftype); } break; + case 19: + if ($ftype == TType::I64) { + $xfer += $input->readI64($this->cleanerStart); + } else { + $xfer += $input->skip($ftype); + } + break; default: $xfer += $input->skip($ftype); break; @@ -496,6 +515,11 @@ class ShowCompactResponseElement $xfer += $output->writeString($this->initiatorVersion); $xfer += $output->writeFieldEnd(); } + if ($this->cleanerStart !== null) { + $xfer += $output->writeFieldBegin('cleanerStart', TType::I64, 19); + $xfer += $output->writeI64($this->cleanerStart); + $xfer += $output->writeFieldEnd(); + } $xfer += $output->writeFieldStop(); $xfer += $output->writeStructEnd(); return $xfer; diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py index d4a0917..f80294b 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py @@ -15751,11 +15751,12 @@ class ShowCompactResponseElement(object): - workerVersion - initiatorId - initiatorVersion + - cleanerStart """ - def __init__(self, dbname=None, tablename=None, partitionname=None, type=None, state=None, workerid=None, start=None, runAs=None, hightestTxnId=None, metaInfo=None, endTime=None, hadoopJobId="None", id=None, errorMessage=None, enqueueTime=None, workerVersion=None, initiatorId=None, initiatorVersion=None,): + def __init__(self, dbname=None, tablename=None, partitionname=None, type=None, state=None, workerid=None, start=None, runAs=None, hightestTxnId=None, metaInfo=None, endTime=None, hadoopJobId="None", id=None, errorMessage=None, enqueueTime=None, workerVersion=None, initiatorId=None, initiatorVersion=None, cleanerStart=None,): self.dbname = dbname self.tablename = tablename self.partitionname = partitionname @@ -15774,6 +15775,7 @@ class ShowCompactResponseElement(object): self.workerVersion = workerVersion self.initiatorId = initiatorId self.initiatorVersion = initiatorVersion + self.cleanerStart = cleanerStart def read(self, iprot): if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None: @@ -15874,6 +15876,11 @@ class ShowCompactResponseElement(object): self.initiatorVersion = iprot.readString().decode('utf-8', errors='replace') if sys.version_info[0] == 2 else iprot.readString() else: iprot.skip(ftype) + elif fid == 19: + if ftype == TType.I64: + self.cleanerStart = iprot.readI64() + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -15956,6 +15963,10 @@ class ShowCompactResponseElement(object): oprot.writeFieldBegin('initiatorVersion', TType.STRING, 18) oprot.writeString(self.initiatorVersion.encode('utf-8') if sys.version_info[0] == 2 else self.initiatorVersion) oprot.writeFieldEnd() + if self.cleanerStart is not None: + oprot.writeFieldBegin('cleanerStart', TType.I64, 19) + oprot.writeI64(self.cleanerStart) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -30425,6 +30436,7 @@ ShowCompactResponseElement.thrift_spec = ( (16, TType.STRING, 'workerVersion', 'UTF8', None, ), # 16 (17, TType.STRING, 'initiatorId', 'UTF8', None, ), # 17 (18, TType.STRING, 'initiatorVersion', 'UTF8', None, ), # 18 + (19, TType.I64, 'cleanerStart', None, None, ), # 19 ) all_structs.append(ShowCompactResponse) ShowCompactResponse.thrift_spec = ( diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb index f08e16f..779898b 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb @@ -4578,6 +4578,7 @@ class ShowCompactResponseElement WORKERVERSION = 16 INITIATORID = 17 INITIATORVERSION = 18 + CLEANERSTART = 19 FIELDS = { DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbname'}, @@ -4597,7 +4598,8 @@ class ShowCompactResponseElement ENQUEUETIME => {:type => ::Thrift::Types::I64, :name => 'enqueueTime', :optional => true}, WORKERVERSION => {:type => ::Thrift::Types::STRING, :name => 'workerVersion', :optional => true}, INITIATORID => {:type => ::Thrift::Types::STRING, :name => 'initiatorId', :optional => true}, - INITIATORVERSION => {:type => ::Thrift::Types::STRING, :name => 'initiatorVersion', :optional => true} + INITIATORVERSION => {:type => ::Thrift::Types::STRING, :name => 'initiatorVersion', :optional => true}, + CLEANERSTART => {:type => ::Thrift::Types::I64, :name => 'cleanerStart', :optional => true} } def struct_fields; FIELDS; end diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index 448ea6a..2feff8c 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -494,6 +494,18 @@ public class MetastoreConf { 12, TimeUnit.HOURS, "Age of oldest initiated compaction in the compaction queue after which an error will be logged. " + "Default time unit is: hours"), + COMPACTOR_LONG_RUNNING_INITIATOR_THRESHOLD_WARNING( + "metastore.compactor.long.running.initiator.threshold.warning", + "hive.compactor.long.running.initiator.threshold.warning", + 6, TimeUnit.HOURS, + "Initiator cycle duration after which a warning will be logged. " + + "Default time unit is: hours"), + COMPACTOR_LONG_RUNNING_INITIATOR_THRESHOLD_ERROR( + "metastore.compactor.long.running.initiator.threshold.error", + "hive.compactor.long.running.initiator.threshold.error", + 12, TimeUnit.HOURS, + "Initiator cycle duration after which an error will be logged. " + + "Default time unit is: hours"), COMPACTOR_COMPLETED_TXN_COMPONENTS_RECORD_THRESHOLD_WARNING( "metastore.compactor.completed.txn.components.record.threshold.warning", "hive.compactor.completed.txn.components.record.threshold.warning", diff --git a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift index c33491e..7241cc6 100644 --- a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift +++ b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift @@ -1327,7 +1327,8 @@ struct ShowCompactResponseElement { 15: optional i64 enqueueTime, 16: optional string workerVersion, 17: optional string initiatorId, - 18: optional string initiatorVersion + 18: optional string initiatorVersion, + 19: optional i64 cleanerStart } struct ShowCompactResponse { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/AcidMetricService.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/AcidMetricService.java index 7870c51..ec8208c 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/AcidMetricService.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/AcidMetricService.java @@ -45,7 +45,9 @@ import static org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTI import static org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTION_NUM_INITIATOR_VERSIONS; import static org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTION_NUM_WORKERS; import static org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTION_NUM_WORKER_VERSIONS; +import static org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTION_OLDEST_CLEANING_AGE; import static org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTION_OLDEST_ENQUEUE_AGE; +import static org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTION_OLDEST_WORKING_AGE; import static org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTION_STATUS_PREFIX; import static org.apache.hadoop.hive.metastore.metrics.MetricsConstants.NUM_ABORTED_TXNS; import static org.apache.hadoop.hive.metastore.metrics.MetricsConstants.NUM_COMPLETED_TXN_COMPONENTS; @@ -268,16 +270,34 @@ public class AcidMetricService implements MetastoreTaskThread { public static void updateMetricsFromShowCompact(ShowCompactResponse showCompactResponse, Configuration conf) { Map<String, ShowCompactResponseElement> lastElements = new HashMap<>(); long oldestEnqueueTime = Long.MAX_VALUE; + long oldestWorkingTime = Long.MAX_VALUE; + long oldestCleaningTime = Long.MAX_VALUE; // Get the last compaction for each db/table/partition for(ShowCompactResponseElement element : showCompactResponse.getCompacts()) { String key = element.getDbname() + "/" + element.getTablename() + (element.getPartitionname() != null ? "/" + element.getPartitionname() : ""); + // If new key, add the element, if there is an existing one, change to the element if the element.id is greater than old.id lastElements.compute(key, (k, old) -> (old == null) ? element : (element.getId() > old.getId() ? element : old)); - if (TxnStore.INITIATED_RESPONSE.equals(element.getState()) && oldestEnqueueTime > element.getEnqueueTime()) { + + // find the oldest elements with initiated and working states + String state = element.getState(); + if (TxnStore.INITIATED_RESPONSE.equals(state) && (oldestEnqueueTime > element.getEnqueueTime())) { oldestEnqueueTime = element.getEnqueueTime(); } + + if (element.isSetStart()) { + if (TxnStore.WORKING_RESPONSE.equals(state) && (oldestWorkingTime > element.getStart())) { + oldestWorkingTime = element.getStart(); + } + } + + if (element.isSetCleanerStart()) { + if (TxnStore.CLEANING_RESPONSE.equals(state) && (oldestCleaningTime > element.getCleanerStart())) { + oldestCleaningTime = element.getCleanerStart(); + } + } } // Get the current count for each state @@ -304,24 +324,13 @@ public class AcidMetricService implements MetastoreTaskThread { LOG.warn("Many compactions are failing. Check root cause of failed/not initiated compactions."); } - if (oldestEnqueueTime == Long.MAX_VALUE) { - Metrics.getOrCreateGauge(COMPACTION_OLDEST_ENQUEUE_AGE).set(0); - } else { - int oldestEnqueueAge = (int) ((System.currentTimeMillis() - oldestEnqueueTime) / 1000L); - Metrics.getOrCreateGauge(COMPACTION_OLDEST_ENQUEUE_AGE) - .set(oldestEnqueueAge); - if (oldestEnqueueAge >= MetastoreConf.getTimeVar(conf, - MetastoreConf.ConfVars.COMPACTOR_OLDEST_INITIATED_COMPACTION_TIME_THRESHOLD_WARNING, TimeUnit.SECONDS) && - oldestEnqueueAge < MetastoreConf.getTimeVar(conf, - MetastoreConf.ConfVars.COMPACTOR_OLDEST_INITIATED_COMPACTION_TIME_THRESHOLD_ERROR, TimeUnit.SECONDS)) { - LOG.warn("Found compaction entry in compaction queue with an age of " + oldestEnqueueAge + " seconds. " + - "Consider increasing the number of worker threads."); - } else if (oldestEnqueueAge >= MetastoreConf.getTimeVar(conf, - MetastoreConf.ConfVars.COMPACTOR_OLDEST_INITIATED_COMPACTION_TIME_THRESHOLD_ERROR, TimeUnit.SECONDS)) { - LOG.error("Found compaction entry in compaction queue with an age of " + oldestEnqueueAge + " seconds. " + - "Consider increasing the number of worker threads"); - } - } + updateOldestCompactionMetric(COMPACTION_OLDEST_ENQUEUE_AGE, oldestEnqueueTime, conf, + "Found compaction entry in compaction queue with an age of {} seconds. " + + "Consider increasing the number of worker threads.", + MetastoreConf.ConfVars.COMPACTOR_OLDEST_INITIATED_COMPACTION_TIME_THRESHOLD_WARNING, + MetastoreConf.ConfVars.COMPACTOR_OLDEST_INITIATED_COMPACTION_TIME_THRESHOLD_ERROR); + updateOldestCompactionMetric(COMPACTION_OLDEST_WORKING_AGE, oldestWorkingTime, conf); + updateOldestCompactionMetric(COMPACTION_OLDEST_CLEANING_AGE, oldestCleaningTime, conf); long initiatorsCount = lastElements.values().stream() //manually initiated compactions don't count @@ -340,6 +349,30 @@ public class AcidMetricService implements MetastoreTaskThread { Metrics.getOrCreateGauge(COMPACTION_NUM_WORKER_VERSIONS).set((int) workerVersionsCount); } + private static void updateOldestCompactionMetric(String metricName, long oldestTime, Configuration conf) { + updateOldestCompactionMetric(metricName, oldestTime, conf, null, null, null); + } + + private static void updateOldestCompactionMetric(String metricName, long oldestTime, Configuration conf, + String logMessage, MetastoreConf.ConfVars warningThreshold, MetastoreConf.ConfVars errorThreshold) { + if (oldestTime == Long.MAX_VALUE) { + Metrics.getOrCreateGauge(metricName) + .set(0); + return; + } + + int oldestAge = (int) ((System.currentTimeMillis() - oldestTime) / 1000L); + Metrics.getOrCreateGauge(metricName) + .set(oldestAge); + if (logMessage != null) { + if (oldestAge >= MetastoreConf.getTimeVar(conf, errorThreshold, TimeUnit.SECONDS)) { + LOG.error(logMessage, oldestAge); + } else if (oldestAge >= MetastoreConf.getTimeVar(conf, warningThreshold, TimeUnit.SECONDS)) { + LOG.warn(logMessage, oldestAge); + } + } + } + @Override public void setConf(Configuration configuration) { this.conf = configuration; diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/MetricsConstants.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/MetricsConstants.java index 6676684..710f0f9 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/MetricsConstants.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/MetricsConstants.java @@ -22,9 +22,13 @@ public class MetricsConstants { public static final String API_PREFIX = "api_"; public static final String COMPACTION_STATUS_PREFIX = "compaction_num_"; public static final String COMPACTION_OLDEST_ENQUEUE_AGE = "compaction_oldest_enqueue_age_in_sec"; + public static final String COMPACTION_OLDEST_WORKING_AGE = "compaction_oldest_working_age_in_sec"; + public static final String COMPACTION_OLDEST_CLEANING_AGE = "compaction_oldest_cleaning_age_in_sec"; public static final String COMPACTION_INITIATOR_CYCLE = "compaction_initiator_cycle"; + public static final String COMPACTION_INITIATOR_CYCLE_DURATION = "compaction_initiator_cycle_duration"; public static final String COMPACTION_INITIATOR_FAILURE_COUNTER = "compaction_initiator_failure_counter"; public static final String COMPACTION_CLEANER_CYCLE = "compaction_cleaner_cycle"; + public static final String COMPACTION_CLEANER_CYCLE_DURATION = "compaction_cleaner_cycle_duration"; public static final String COMPACTION_CLEANER_FAILURE_COUNTER = "compaction_cleaner_failure_counter"; public static final String COMPACTION_WORKER_CYCLE = "compaction_worker_cycle"; diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index 4a9a6ef..9cece8a 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -325,9 +325,9 @@ class CompactionTxnHandler extends TxnHandler { public List<CompactionInfo> findReadyToClean(long minOpenTxnWaterMark, long retentionTime) throws MetaException { try { List<CompactionInfo> rc = new ArrayList<>(); - + try (Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - Statement stmt = dbConn.createStatement()) { + Statement stmt = dbConn.createStatement()) { /* * By filtering on minOpenTxnWaterMark, we will only cleanup after every transaction is committed, that could see * the uncompacted deltas. This way the cleaner can clean up everything that was made obsolete by this compaction. @@ -340,20 +340,20 @@ class CompactionTxnHandler extends TxnHandler { whereClause += " AND \"CQ_COMMIT_TIME\" < (" + getEpochFn(dbProduct) + " - " + retentionTime + ")"; } String s = "SELECT \"CQ_ID\", \"cq1\".\"CQ_DATABASE\", \"cq1\".\"CQ_TABLE\", \"cq1\".\"CQ_PARTITION\"," + - " \"CQ_TYPE\", \"CQ_RUN_AS\", \"CQ_HIGHEST_WRITE_ID\", \"CQ_TBLPROPERTIES\"" + - " FROM \"COMPACTION_QUEUE\" \"cq1\" " + - "INNER JOIN (" + - " SELECT MIN(\"CQ_HIGHEST_WRITE_ID\") \"WRITE_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\"" + - " FROM \"COMPACTION_QUEUE\"" - + whereClause + - " GROUP BY \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\") \"cq2\" " + - "ON \"cq1\".\"CQ_DATABASE\" = \"cq2\".\"CQ_DATABASE\""+ - " AND \"cq1\".\"CQ_TABLE\" = \"cq2\".\"CQ_TABLE\""+ - " AND (\"cq1\".\"CQ_PARTITION\" = \"cq2\".\"CQ_PARTITION\"" + - " OR \"cq1\".\"CQ_PARTITION\" IS NULL AND \"cq2\".\"CQ_PARTITION\" IS NULL)" - + whereClause + - " AND \"CQ_HIGHEST_WRITE_ID\" = \"WRITE_ID\"" + - " ORDER BY \"CQ_ID\""; + " \"CQ_TYPE\", \"CQ_RUN_AS\", \"CQ_HIGHEST_WRITE_ID\", \"CQ_TBLPROPERTIES\"" + + " FROM \"COMPACTION_QUEUE\" \"cq1\" " + + "INNER JOIN (" + + " SELECT MIN(\"CQ_HIGHEST_WRITE_ID\") \"WRITE_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\"" + + " FROM \"COMPACTION_QUEUE\"" + + whereClause + + " GROUP BY \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\") \"cq2\" " + + "ON \"cq1\".\"CQ_DATABASE\" = \"cq2\".\"CQ_DATABASE\""+ + " AND \"cq1\".\"CQ_TABLE\" = \"cq2\".\"CQ_TABLE\""+ + " AND (\"cq1\".\"CQ_PARTITION\" = \"cq2\".\"CQ_PARTITION\"" + + " OR \"cq1\".\"CQ_PARTITION\" IS NULL AND \"cq2\".\"CQ_PARTITION\" IS NULL)" + + whereClause + + " AND \"CQ_HIGHEST_WRITE_ID\" = \"WRITE_ID\"" + + " ORDER BY \"CQ_ID\""; LOG.debug("Going to execute query <" + s + ">"); try (ResultSet rs = stmt.executeQuery(s)) { @@ -377,13 +377,110 @@ class CompactionTxnHandler extends TxnHandler { LOG.error("Unable to select next element for cleaning, " + e.getMessage()); checkRetryable(e, "findReadyToClean"); throw new MetaException("Unable to connect to transaction database " + - StringUtils.stringifyException(e)); - } + StringUtils.stringifyException(e)); + } } catch (RetryException e) { return findReadyToClean(minOpenTxnWaterMark, retentionTime); } } + + /** + * Mark the cleaning start time for a particular compaction + * + * @param info info on the compaction entry + */ + @Override + @RetrySemantics.ReadOnly + public void markCleanerStart(CompactionInfo info) throws MetaException { + if (LOG.isDebugEnabled()) { + LOG.debug("Running markCleanerStart with CompactionInfo: " + info.toString()); + } + + try { + Connection dbConn = null; + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + long now = getDbTime(dbConn); + setCleanerStart(dbConn, info, now); + } catch (SQLException e) { + LOG.error("Unable to set the cleaner start time for compaction record " + e.getMessage()); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(e, "markCleanerStart(" + info + ")"); + throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); + } finally { + closeDbConn(dbConn); + } + } catch (RetryException e) { + markCleanerStart(info); + } + } + + /** + * Removes the cleaning start time for a particular compaction + * + * @param info info on the compaction entry + */ + @Override + @RetrySemantics.ReadOnly + public void clearCleanerStart(CompactionInfo info) throws MetaException { + if (LOG.isDebugEnabled()) { + LOG.debug("Running clearCleanerStart with CompactionInfo: " + info.toString()); + } + + try { + Connection dbConn = null; + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + setCleanerStart(dbConn, info, -1L); + } catch (SQLException e) { + LOG.error("Unable to clear the cleaner start time for compaction record " + e.getMessage()); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(e, "clearCleanerStart(" + info + ")"); + throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); + } finally { + closeDbConn(dbConn); + } + } catch (RetryException e) { + clearCleanerStart(info); + } + } + + private void setCleanerStart(Connection dbConn, CompactionInfo info, Long timestamp) + throws RetryException, SQLException { + long id = info.id; + PreparedStatement pStmt = null; + ResultSet rs = null; + try { + String query = "" + + " UPDATE " + + " \"COMPACTION_QUEUE\" " + + " SET " + + " \"CQ_CLEANER_START\" = " + timestamp + + " WHERE " + + " \"CQ_ID\" = " + id + + " AND " + + " \"CQ_STATE\"='" + READY_FOR_CLEANING + "'"; + + pStmt = dbConn.prepareStatement(query); + LOG.debug("Going to execute update <" + query + "> for CQ_ID=" + id); + int updCount = pStmt.executeUpdate(); + if (updCount != 1) { + LOG.error("Unable to update compaction record: " + info + ". Update count=" + updCount); + LOG.debug("Going to rollback"); + dbConn.rollback(); + } else { + LOG.debug("Going to commit"); + dbConn.commit(); + } + } finally { + close(rs); + closeStmt(pStmt); + } + } + /** * This will remove an entry from the queue after * it has been compacted. diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 0d7d573..c9c09ad 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -3703,6 +3703,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { return Character.toString(s); } } + @RetrySemantics.ReadOnly public ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaException { ShowCompactResponse response = new ShowCompactResponse(new ArrayList<>()); @@ -3712,15 +3713,23 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "SELECT \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", \"CQ_STATE\", \"CQ_TYPE\", \"CQ_WORKER_ID\", " + - //-1 because 'null' literal doesn't work for all DBs... - "\"CQ_START\", -1 \"CC_END\", \"CQ_RUN_AS\", \"CQ_HADOOP_JOB_ID\", \"CQ_ID\", \"CQ_ERROR_MESSAGE\", " + - "\"CQ_ENQUEUE_TIME\", \"CQ_WORKER_VERSION\", \"CQ_INITIATOR_ID\", \"CQ_INITIATOR_VERSION\" " + - "FROM \"COMPACTION_QUEUE\" UNION ALL " + - "SELECT \"CC_DATABASE\", \"CC_TABLE\", \"CC_PARTITION\", \"CC_STATE\", \"CC_TYPE\", \"CC_WORKER_ID\", " + - "\"CC_START\", \"CC_END\", \"CC_RUN_AS\", \"CC_HADOOP_JOB_ID\", \"CC_ID\", \"CC_ERROR_MESSAGE\", " + - "\"CC_ENQUEUE_TIME\", \"CC_WORKER_VERSION\", \"CC_INITIATOR_ID\", \"CC_INITIATOR_VERSION\" " + - " FROM \"COMPLETED_COMPACTIONS\""; //todo: sort by cq_id? + String s = "" + + //-1 because 'null' literal doesn't work for all DBs... + "SELECT " + + " \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", \"CQ_STATE\", \"CQ_TYPE\", \"CQ_WORKER_ID\", " + + " \"CQ_START\", -1 \"CC_END\", \"CQ_RUN_AS\", \"CQ_HADOOP_JOB_ID\", \"CQ_ID\", \"CQ_ERROR_MESSAGE\", " + + " \"CQ_ENQUEUE_TIME\", \"CQ_WORKER_VERSION\", \"CQ_INITIATOR_ID\", \"CQ_INITIATOR_VERSION\", " + + " \"CQ_CLEANER_START\"" + + "FROM " + + " \"COMPACTION_QUEUE\" " + + "UNION ALL " + + "SELECT " + + " \"CC_DATABASE\", \"CC_TABLE\", \"CC_PARTITION\", \"CC_STATE\", \"CC_TYPE\", \"CC_WORKER_ID\", " + + " \"CC_START\", \"CC_END\", \"CC_RUN_AS\", \"CC_HADOOP_JOB_ID\", \"CC_ID\", \"CC_ERROR_MESSAGE\", " + + " \"CC_ENQUEUE_TIME\", \"CC_WORKER_VERSION\", \"CC_INITIATOR_ID\", \"CC_INITIATOR_VERSION\", " + + " -1 " + + "FROM " + + " \"COMPLETED_COMPACTIONS\""; //todo: sort by cq_id? //what I want is order by cc_end desc, cc_start asc (but derby has a bug https://issues.apache.org/jira/browse/DERBY-6013) //to sort so that currently running jobs are at the end of the list (bottom of screen) //and currently running ones are in sorted by start time @@ -3740,11 +3749,11 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } e.setWorkerid(rs.getString(6)); long start = rs.getLong(7); - if(!rs.wasNull()) { + if (!rs.wasNull()) { e.setStart(start); } long endTime = rs.getLong(8); - if(endTime != -1) { + if (endTime != -1) { e.setEndTime(endTime); } e.setRunAs(rs.getString(9)); @@ -3752,18 +3761,22 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { e.setId(rs.getLong(11)); e.setErrorMessage(rs.getString(12)); long enqueueTime = rs.getLong(13); - if(!rs.wasNull()) { + if (!rs.wasNull()) { e.setEnqueueTime(enqueueTime); } e.setWorkerVersion(rs.getString(14)); e.setInitiatorId(rs.getString(15)); e.setInitiatorVersion(rs.getString(16)); + long cleanerStart = rs.getLong(17); + if (!rs.wasNull() && (cleanerStart != -1)) { + e.setCleanerStart(cleanerStart); + } response.addToCompacts(e); } } catch (SQLException e) { checkRetryable(e, "showCompact(" + rqst + ")"); throw new MetaException("Unable to select from transaction database " + - StringUtils.stringifyException(e)); + StringUtils.stringifyException(e)); } finally { closeStmt(stmt); closeDbConn(dbConn); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index a0af4a8..d325765 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -24,8 +24,52 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.classification.RetrySemantics; -import org.apache.hadoop.hive.metastore.api.*; -import org.apache.hadoop.hive.metastore.events.AcidWriteEvent; +import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; +import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest; +import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions; +import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest; +import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse; +import org.apache.hadoop.hive.metastore.api.CheckLockRequest; +import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; +import org.apache.hadoop.hive.metastore.api.CompactionRequest; +import org.apache.hadoop.hive.metastore.api.CompactionResponse; +import org.apache.hadoop.hive.metastore.api.CreationMetadata; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FindNextCompactRequest; +import org.apache.hadoop.hive.metastore.api.GetLatestCommittedCompactionInfoRequest; +import org.apache.hadoop.hive.metastore.api.GetLatestCommittedCompactionInfoResponse; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest; +import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse; +import org.apache.hadoop.hive.metastore.api.HeartbeatRequest; +import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeRequest; +import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse; +import org.apache.hadoop.hive.metastore.api.HiveObjectType; +import org.apache.hadoop.hive.metastore.api.LockRequest; +import org.apache.hadoop.hive.metastore.api.LockResponse; +import org.apache.hadoop.hive.metastore.api.Materialization; +import org.apache.hadoop.hive.metastore.api.MaxAllocatedTableWriteIdRequest; +import org.apache.hadoop.hive.metastore.api.MaxAllocatedTableWriteIdResponse; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchLockException; +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; +import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; +import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.ReplTblWriteIdStateRequest; +import org.apache.hadoop.hive.metastore.api.SeedTableWriteIdsRequest; +import org.apache.hadoop.hive.metastore.api.SeedTxnIdRequest; +import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; +import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.TxnAbortedException; +import org.apache.hadoop.hive.metastore.api.TxnOpenException; +import org.apache.hadoop.hive.metastore.api.TxnType; +import org.apache.hadoop.hive.metastore.api.UnlockRequest; +import org.apache.hadoop.hive.metastore.api.UpdateTransactionalStatsRequest; import org.apache.hadoop.hive.metastore.events.ListenerEvent; import java.sql.SQLException; @@ -382,7 +426,7 @@ public interface TxnStore extends Configurable { * @param compactionTxnId - txnid in which Compactor is running */ @RetrySemantics.Idempotent - public void updateCompactorState(CompactionInfo ci, long compactionTxnId) throws MetaException; + void updateCompactorState(CompactionInfo ci, long compactionTxnId) throws MetaException; /** * This will grab the next compaction request off of @@ -423,6 +467,22 @@ public interface TxnStore extends Configurable { List<CompactionInfo> findReadyToClean(long minOpenTxnWaterMark, long retentionTime) throws MetaException; /** + * Sets the cleaning start time for a particular compaction + * + * @param info info on the compaction entry + */ + @RetrySemantics.CannotRetry + void markCleanerStart(CompactionInfo info) throws MetaException; + + /** + * Removes the cleaning start time for a particular compaction + * + * @param info info on the compaction entry + */ + @RetrySemantics.CannotRetry + void clearCleanerStart(CompactionInfo info) throws MetaException; + + /** * This will remove an entry from the queue after * it has been compacted. * diff --git a/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql b/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql index 0d4ad17..5c49580 100644 --- a/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql +++ b/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql @@ -628,7 +628,8 @@ CREATE TABLE COMPACTION_QUEUE ( CQ_COMMIT_TIME bigint, CQ_INITIATOR_ID varchar(128), CQ_INITIATOR_VERSION varchar(128), - CQ_WORKER_VERSION varchar(128) + CQ_WORKER_VERSION varchar(128), + CQ_CLEANER_START bigint ); CREATE TABLE NEXT_COMPACTION_QUEUE_ID ( diff --git a/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql b/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql index 86c49c4..37e42d8 100644 --- a/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql +++ b/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql @@ -182,5 +182,8 @@ ALTER TABLE "APP"."MV_TABLES_USED" ADD COLUMN "UPDATED_COUNT" BIGINT NOT NULL DE ALTER TABLE "APP"."MV_TABLES_USED" ADD COLUMN "DELETED_COUNT" BIGINT NOT NULL DEFAULT 0; ALTER TABLE "APP"."MV_TABLES_USED" ADD CONSTRAINT "MV_TABLES_USED_PK" PRIMARY KEY ("TBL_ID", "MV_CREATION_METADATA_ID"); +-- HIVE-25737 +ALTER TABLE COMPACTION_QUEUE ADD CQ_CLEANER_START bigint; + -- This needs to be the last thing done. Insert any changes above this line. UPDATE "APP".VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1; diff --git a/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql b/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql index 6a3b174..72a402a 100644 --- a/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql +++ b/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql @@ -1048,6 +1048,7 @@ CREATE TABLE COMPACTION_QUEUE( CQ_INITIATOR_ID nvarchar(128) NULL, CQ_INITIATOR_VERSION nvarchar(128) NULL, CQ_WORKER_VERSION nvarchar(128) NULL, + CQ_CLEANER_START bigint NULL, PRIMARY KEY CLUSTERED ( CQ_ID ASC diff --git a/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql b/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql index 296cca5..5460d7a 100644 --- a/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql +++ b/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql @@ -233,6 +233,9 @@ ALTER TABLE "MV_TABLES_USED" ADD "UPDATED_COUNT" BIGINT NOT NULL DEFAULT 0; ALTER TABLE "MV_TABLES_USED" ADD "DELETED_COUNT" BIGINT NOT NULL DEFAULT 0; ALTER TABLE "MV_TABLES_USED" ADD CONSTRAINT "MV_TABLES_USED_PK" PRIMARY KEY ("TBL_ID", "MV_CREATION_METADATA_ID"); +-- HIVE-25737 +ALTER TABLE COMPACTION_QUEUE ADD CQ_CLEANER_START bigint NULL; + -- These lines need to be last. Insert any changes above. UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1; SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS MESSAGE; diff --git a/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql b/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql index 0cbc091..2ab4cf6 100644 --- a/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql +++ b/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql @@ -1089,7 +1089,8 @@ CREATE TABLE COMPACTION_QUEUE ( CQ_COMMIT_TIME bigint, CQ_INITIATOR_ID varchar(128), CQ_INITIATOR_VERSION varchar(128), - CQ_WORKER_VERSION varchar(128) + CQ_WORKER_VERSION varchar(128), + CQ_CLEANER_START bigint ) ENGINE=InnoDB DEFAULT CHARSET=latin1; CREATE TABLE COMPLETED_COMPACTIONS ( diff --git a/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql b/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql index 1d3a1f06..89b6a91 100644 --- a/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql +++ b/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql @@ -210,6 +210,9 @@ ALTER TABLE `MV_TABLES_USED` ADD COLUMN `UPDATED_COUNT` bigint(20) NOT NULL DEFA ALTER TABLE `MV_TABLES_USED` ADD COLUMN `DELETED_COUNT` bigint(20) NOT NULL DEFAULT 0; ALTER TABLE `MV_TABLES_USED` ADD CONSTRAINT `MV_TABLES_USED_PK` PRIMARY KEY (`TBL_ID`, `MV_CREATION_METADATA_ID`); +-- HIVE-25737 +ALTER TABLE COMPACTION_QUEUE ADD CQ_CLEANER_START bigint; + -- These lines need to be last. Insert any changes above. UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1; SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS MESSAGE; diff --git a/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql b/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql index a208154..055f101 100644 --- a/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql +++ b/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql @@ -1091,7 +1091,8 @@ CREATE TABLE COMPACTION_QUEUE ( CQ_COMMIT_TIME NUMBER(19), CQ_INITIATOR_ID varchar(128), CQ_INITIATOR_VERSION varchar(128), - CQ_WORKER_VERSION varchar(128) + CQ_WORKER_VERSION varchar(128), + CQ_CLEANER_START NUMBER(19) ) ROWDEPENDENCIES; CREATE TABLE NEXT_COMPACTION_QUEUE_ID ( diff --git a/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql b/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql index d5810de..cce71a3 100644 --- a/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql +++ b/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql @@ -207,7 +207,9 @@ ALTER TABLE MV_TABLES_USED ADD UPDATED_COUNT NUMBER DEFAULT 0 NOT NULL; ALTER TABLE MV_TABLES_USED ADD DELETED_COUNT NUMBER DEFAULT 0 NOT NULl; ALTER TABLE MV_TABLES_USED ADD CONSTRAINT MV_TABLES_USED_PK PRIMARY KEY (TBL_ID, MV_CREATION_METADATA_ID); +-- HIVE-25737 +ALTER TABLE COMPACTION_QUEUE ADD CQ_CLEANER_START NUMBER(19); + -- These lines need to be last. Insert any changes above. UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1; SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS Status from dual; - diff --git a/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql b/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql index 6e83ff1..639f18d 100644 --- a/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql +++ b/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql @@ -1800,7 +1800,8 @@ CREATE TABLE "COMPACTION_QUEUE" ( "CQ_COMMIT_TIME" bigint, "CQ_INITIATOR_ID" varchar(128), "CQ_INITIATOR_VERSION" varchar(128), - "CQ_WORKER_VERSION" varchar(128) + "CQ_WORKER_VERSION" varchar(128), + "CQ_CLEANER_START" bigint ); CREATE TABLE "NEXT_COMPACTION_QUEUE_ID" ( diff --git a/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql b/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql index 83f8336..a2c0a81 100644 --- a/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql +++ b/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql @@ -344,7 +344,9 @@ ALTER TABLE "MV_TABLES_USED" ADD "UPDATED_COUNT" bigint NOT NULL DEFAULT 0; ALTER TABLE "MV_TABLES_USED" ADD "DELETED_COUNT" bigint NOT NULL DEFAULT 0; ALTER TABLE "MV_TABLES_USED" ADD CONSTRAINT "MV_TABLES_USED_PK" PRIMARY KEY ("TBL_ID", "MV_CREATION_METADATA_ID"); +-- HIVE-25737 +ALTER TABLE "COMPACTION_QUEUE" ADD "CQ_CLEANER_START" bigint; + -- These lines need to be last. Insert any changes above. UPDATE "VERSION" SET "SCHEMA_VERSION"='4.0.0', "VERSION_COMMENT"='Hive release version 4.0.0' where "VER_ID"=1; SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0'; - diff --git a/standalone-metastore/metastore-server/src/test/resources/sql/postgres/upgrade-3.1.3000-to-4.0.0.postgres.sql b/standalone-metastore/metastore-server/src/test/resources/sql/postgres/upgrade-3.1.3000-to-4.0.0.postgres.sql index 21f8061..e6507bd 100644 --- a/standalone-metastore/metastore-server/src/test/resources/sql/postgres/upgrade-3.1.3000-to-4.0.0.postgres.sql +++ b/standalone-metastore/metastore-server/src/test/resources/sql/postgres/upgrade-3.1.3000-to-4.0.0.postgres.sql @@ -115,6 +115,9 @@ ALTER TABLE "DBS" ADD "DATACONNECTOR_NAME" character varying(128); ALTER TABLE "DBS" ADD "REMOTE_DBNAME" character varying(128); UPDATE "DBS" SET "TYPE"= 'NATIVE' WHERE "TYPE" IS NULL; +-- HIVE-25737 +ALTER TABLE "COMPACTION_QUEUE" ADD "CQ_CLEANER_START" bigint; + -- These lines need to be last. Insert any changes above. UPDATE "VERSION" SET "SCHEMA_VERSION"='4.0.0', "VERSION_COMMENT"='Hive release version 4.0.0' where "VER_ID"=1; SELECT 'Finished upgrading MetaStore schema from 3.1.3000 to 4.0.0';