This is an automated email from the ASF dual-hosted git repository. dkuzmenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new e305109 HIVE-24291: Compaction cleaner should wait for all prev txn to commit (Peter Varga, reviewed by Karen Coppage, Denys Kuzmenko) e305109 is described below commit e305109ec3e0f2e8359e501633cee9f893bf471c Author: Peter Varga <pva...@cloudera.com> AuthorDate: Mon Nov 2 16:21:12 2020 +0100 HIVE-24291: Compaction cleaner should wait for all prev txn to commit (Peter Varga, reviewed by Karen Coppage, Denys Kuzmenko) Closes (#1592) --- .../hive/ql/txn/compactor/CompactorTestUtil.java | 5 + .../txn/compactor/TestCleanerWithReplication.java | 24 +---- .../hadoop/hive/ql/txn/compactor/Cleaner.java | 2 +- .../metastore/txn/TestCompactionTxnHandler.java | 30 +++--- .../apache/hadoop/hive/ql/TestTxnCommands2.java | 12 ++- .../hadoop/hive/ql/TxnCommandsBaseForTests.java | 4 + .../hive/ql/txn/compactor/CompactorTest.java | 43 ++++++-- .../hadoop/hive/ql/txn/compactor/TestCleaner.java | 115 ++++++++++++--------- .../llap/cardinality_preserving_join_opt2.q.out | 2 +- .../hive/metastore/txn/CompactionTxnHandler.java | 64 +++++------- .../hadoop/hive/metastore/txn/TxnHandler.java | 56 ++++++---- .../apache/hadoop/hive/metastore/txn/TxnStore.java | 3 +- .../src/main/sql/derby/hive-schema-4.0.0.derby.sql | 3 +- .../sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql | 3 + .../src/main/sql/mssql/hive-schema-4.0.0.mssql.sql | 3 +- .../sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql | 3 + .../src/main/sql/mysql/hive-schema-4.0.0.mysql.sql | 3 +- .../sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql | 3 + .../main/sql/oracle/hive-schema-4.0.0.oracle.sql | 3 +- .../sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql | 3 + .../sql/postgres/hive-schema-4.0.0.postgres.sql | 3 +- .../postgres/upgrade-3.2.0-to-4.0.0.postgres.sql | 3 + 22 files changed, 232 insertions(+), 158 deletions(-) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java index 9db2229..9bc3c61 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.metastore.api.CompactionRequest; import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.IDriver; @@ -52,6 +53,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Objects; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -145,6 +147,9 @@ class CompactorTestUtil { * @throws Exception if cleaner cannot be started. */ static void runCleaner(HiveConf hConf) throws Exception { + // Wait for the cooldown period so the Cleaner can see last committed txn as the highest committed watermark + Thread.sleep(MetastoreConf.getTimeVar(hConf, MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS)); + HiveConf hiveConf = new HiveConf(hConf); Cleaner t = new Cleaner(); t.setThreadId((int) t.getId()); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java index d956067..0a96502 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java @@ -111,11 +111,7 @@ public class TestCleanerWithReplication extends CompactorTest { burnThroughTransactions(dbName, "camtc", 25); CompactionRequest rqst = new CompactionRequest(dbName, "camtc", CompactionType.MAJOR); - txnHandler.compact(rqst); - CompactionInfo ci = txnHandler.findNextToCompact("fred"); - ci.runAs = System.getProperty("user.name"); - txnHandler.updateCompactorState(ci, openTxn()); - txnHandler.markCompacted(ci); + compactInTxn(rqst); assertCleanerActions(6); } @@ -134,11 +130,7 @@ public class TestCleanerWithReplication extends CompactorTest { CompactionRequest rqst = new CompactionRequest(dbName, "campc", CompactionType.MAJOR); rqst.setPartitionname("ds=today"); - txnHandler.compact(rqst); - CompactionInfo ci = txnHandler.findNextToCompact("fred"); - ci.runAs = System.getProperty("user.name"); - txnHandler.updateCompactorState(ci, openTxn()); - txnHandler.markCompacted(ci); + compactInTxn(rqst); assertCleanerActions(6); } @@ -155,11 +147,7 @@ public class TestCleanerWithReplication extends CompactorTest { burnThroughTransactions(dbName, "camitc", 25); CompactionRequest rqst = new CompactionRequest(dbName, "camitc", CompactionType.MINOR); - txnHandler.compact(rqst); - CompactionInfo ci = txnHandler.findNextToCompact("fred"); - ci.runAs = System.getProperty("user.name"); - txnHandler.updateCompactorState(ci, openTxn()); - txnHandler.markCompacted(ci); + compactInTxn(rqst); assertCleanerActions(4); } @@ -178,11 +166,7 @@ public class TestCleanerWithReplication extends CompactorTest { CompactionRequest rqst = new CompactionRequest(dbName, "camipc", CompactionType.MINOR); rqst.setPartitionname("ds=today"); - txnHandler.compact(rqst); - CompactionInfo ci = txnHandler.findNextToCompact("fred"); - ci.runAs = System.getProperty("user.name"); - txnHandler.updateCompactorState(ci, openTxn()); - txnHandler.markCompacted(ci); + compactInTxn(rqst); assertCleanerActions(4); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index ae09088..550ce7e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -97,7 +97,7 @@ public class Cleaner extends MetaStoreCompactorThread { long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner(); LOG.info("Cleaning based on min open txn id: " + minOpenTxnId); List<CompletableFuture> cleanerList = new ArrayList<>(); - for (CompactionInfo compactionInfo : txnHandler.findReadyToClean()) { + for (CompactionInfo compactionInfo : txnHandler.findReadyToClean(minOpenTxnId)) { cleanerList.add(CompletableFuture.runAsync(CompactorUtil.ThrowingRunnable.unchecked(() -> clean(compactionInfo, minOpenTxnId)), cleanerExecutor)); } diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java index 8d7c7d2..51ca05f 100644 --- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java +++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java @@ -183,15 +183,15 @@ public class TestCompactionTxnHandler { CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); rqst.setPartitionname("ds=today"); txnHandler.compact(rqst); - assertEquals(0, txnHandler.findReadyToClean().size()); + assertEquals(0, txnHandler.findReadyToClean(0).size()); CompactionInfo ci = txnHandler.findNextToCompact("fred"); assertNotNull(ci); - assertEquals(0, txnHandler.findReadyToClean().size()); + assertEquals(0, txnHandler.findReadyToClean(0).size()); txnHandler.markCompacted(ci); assertNull(txnHandler.findNextToCompact("fred")); - List<CompactionInfo> toClean = txnHandler.findReadyToClean(); + List<CompactionInfo> toClean = txnHandler.findReadyToClean(0); assertEquals(1, toClean.size()); assertNull(txnHandler.findNextToCompact("fred")); @@ -212,20 +212,20 @@ public class TestCompactionTxnHandler { CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); rqst.setPartitionname("ds=today"); txnHandler.compact(rqst); - assertEquals(0, txnHandler.findReadyToClean().size()); + assertEquals(0, txnHandler.findReadyToClean(0).size()); CompactionInfo ci = txnHandler.findNextToCompact("fred"); assertNotNull(ci); - assertEquals(0, txnHandler.findReadyToClean().size()); + assertEquals(0, txnHandler.findReadyToClean(0).size()); txnHandler.markCompacted(ci); assertNull(txnHandler.findNextToCompact("fred")); - List<CompactionInfo> toClean = txnHandler.findReadyToClean(); + List<CompactionInfo> toClean = txnHandler.findReadyToClean(0); assertEquals(1, toClean.size()); assertNull(txnHandler.findNextToCompact("fred")); txnHandler.markCleaned(ci); assertNull(txnHandler.findNextToCompact("fred")); - assertEquals(0, txnHandler.findReadyToClean().size()); + assertEquals(0, txnHandler.findReadyToClean(0).size()); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); assertEquals(1, rsp.getCompactsSize()); @@ -262,11 +262,11 @@ public class TestCompactionTxnHandler { CompactionRequest rqst = new CompactionRequest(dbName, tableName, CompactionType.MINOR); rqst.setPartitionname(partitionName); txnHandler.compact(rqst); - assertEquals(0, txnHandler.findReadyToClean().size()); + assertEquals(0, txnHandler.findReadyToClean(0).size()); CompactionInfo ci = txnHandler.findNextToCompact(workerId); assertNotNull(ci); - assertEquals(0, txnHandler.findReadyToClean().size()); + assertEquals(0, txnHandler.findReadyToClean(0).size()); ci.errorMessage = errorMessage; txnHandler.markFailed(ci); assertNull(txnHandler.findNextToCompact(workerId)); @@ -501,12 +501,13 @@ public class TestCompactionTxnHandler { // Now clean them and check that they are removed from the count. CompactionRequest rqst = new CompactionRequest("mydb", "mytable", CompactionType.MAJOR); txnHandler.compact(rqst); - assertEquals(0, txnHandler.findReadyToClean().size()); + assertEquals(0, txnHandler.findReadyToClean(0).size()); ci = txnHandler.findNextToCompact("fred"); assertNotNull(ci); txnHandler.markCompacted(ci); - List<CompactionInfo> toClean = txnHandler.findReadyToClean(); + Thread.sleep(txnHandler.getOpenTxnTimeOutMillis()); + List<CompactionInfo> toClean = txnHandler.findReadyToClean(0); assertEquals(1, toClean.size()); txnHandler.markCleaned(ci); @@ -516,21 +517,20 @@ public class TestCompactionTxnHandler { // Create one aborted for low water mark txnid = openTxn(); txnHandler.abortTxn(new AbortTxnRequest(txnid)); - txnHandler.setOpenTxnTimeOutMillis(1); + Thread.sleep(txnHandler.getOpenTxnTimeOutMillis()); txnHandler.cleanEmptyAbortedAndCommittedTxns(); txnList = txnHandler.getOpenTxns(); assertEquals(3, txnList.getOpen_txnsSize()); - txnHandler.setOpenTxnTimeOutMillis(1000); rqst = new CompactionRequest("mydb", "foo", CompactionType.MAJOR); rqst.setPartitionname("bar"); txnHandler.compact(rqst); - assertEquals(0, txnHandler.findReadyToClean().size()); + assertEquals(0, txnHandler.findReadyToClean(0).size()); ci = txnHandler.findNextToCompact("fred"); assertNotNull(ci); txnHandler.markCompacted(ci); - toClean = txnHandler.findReadyToClean(); + toClean = txnHandler.findReadyToClean(0); assertEquals(1, toClean.size()); txnHandler.markCleaned(ci); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index be44951..7e4d3df 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -2110,19 +2110,23 @@ public class TestTxnCommands2 { Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID" + acidTblPartWhereClause), 0, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID" + acidTblPartWhereClause)); - // The cleaner will removed aborted txns data/metadata but cannot remove aborted txn2 from TXN_TO_WRITE_ID - // as there is a open txn < aborted txn2. The aborted txn1 < open txn and will be removed. - // Also, committed txn > open txn is retained. + // The cleaner after the compaction will not run, since the open txnId < compaction commit txnId + // Since aborted txns data/metadata was not removed, all data in TXN_TO_WRITE_ID should remain txnHandler.compact(new CompactionRequest("default", Table.ACIDTBL.name().toLowerCase(), CompactionType.MAJOR)); runWorker(hiveConf); runCleaner(hiveConf); txnHandler.cleanEmptyAbortedAndCommittedTxns(); txnHandler.cleanTxnToWriteIdTable(); Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID"), - 2, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID")); + 3, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID")); // Commit the open txn, which lets the cleanup on TXN_TO_WRITE_ID. txnMgr.commitTxn(); + txnMgr.openTxn(ctx, "u1"); + txnMgr.getValidTxns(); + // The txn opened after the compaction commit should not effect the Cleaner + runCleaner(hiveConf); + txnHandler.cleanEmptyAbortedAndCommittedTxns(); txnHandler.cleanTxnToWriteIdTable(); Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID"), diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java index e49cc55..976c604 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java @@ -24,6 +24,7 @@ import java.util.Comparator; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -36,6 +37,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClientWithLocalCache; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; @@ -206,6 +208,8 @@ public abstract class TxnCommandsBaseForTests { runCompactorThread(hiveConf, CompactorThreadType.WORKER); } public static void runCleaner(HiveConf hiveConf) throws Exception { + // Wait for the cooldown period so the Cleaner can see the last committed txn as the highest committed watermark + Thread.sleep(MetastoreConf.getTimeVar(hiveConf, MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS)); runCompactorThread(hiveConf, CompactorThreadType.CLEANER); } public static void runInitiator(HiveConf hiveConf) throws Exception { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java index 54e9529..e6272f6 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest; import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse; import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; +import org.apache.hadoop.hive.metastore.api.CompactionRequest; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; @@ -45,7 +46,10 @@ import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.TxnAbortedException; +import org.apache.hadoop.hive.metastore.api.TxnType; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; @@ -82,6 +86,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.Stack; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -187,8 +192,13 @@ public abstract class CompactorTest { } protected long openTxn() throws MetaException { - List<Long> txns = txnHandler.openTxns(new OpenTxnRequest(1, System.getProperty("user.name"), - Worker.hostname())).getTxn_ids(); + return openTxn(TxnType.DEFAULT); + } + + protected long openTxn(TxnType txnType) throws MetaException { + OpenTxnRequest rqst = new OpenTxnRequest(1, System.getProperty("user.name"), Worker.hostname()); + rqst.setTxn_type(txnType); + List<Long> txns = txnHandler.openTxns(rqst).getTxn_ids(); return txns.get(0); } @@ -214,6 +224,9 @@ public abstract class CompactorTest { protected void addBaseFile(Table t, Partition p, long maxTxn, int numRecords) throws Exception { addFile(t, p, 0, maxTxn, numRecords, FileType.BASE, 2, true); } + protected void addBaseFile(Table t, Partition p, long maxTxn, int numRecords, long visibilityId) throws Exception { + addFile(t, p, 0, maxTxn, numRecords, FileType.BASE, 2, true, visibilityId); + } protected void addLegacyFile(Table t, Partition p, int numRecords) throws Exception { addFile(t, p, 0, 0, numRecords, FileType.LEGACY, 2, true); @@ -316,16 +329,20 @@ public abstract class CompactorTest { return location; } - private enum FileType {BASE, DELTA, LEGACY, LENGTH_FILE}; + private enum FileType {BASE, DELTA, LEGACY, LENGTH_FILE} - private void addFile(Table t, Partition p, long minTxn, long maxTxn, - int numRecords, FileType type, int numBuckets, - boolean allBucketsPresent) throws Exception { + private void addFile(Table t, Partition p, long minTxn, long maxTxn, int numRecords, FileType type, int numBuckets, + boolean allBucketsPresent) throws Exception { + addFile(t, p, minTxn, maxTxn, numRecords, type, numBuckets, allBucketsPresent, 0); + } + + private void addFile(Table t, Partition p, long minTxn, long maxTxn, int numRecords, FileType type, int numBuckets, + boolean allBucketsPresent, long visibilityId) throws Exception { String partValue = (p == null) ? null : p.getValues().get(0); Path location = new Path(getLocation(t.getTableName(), partValue)); String filename = null; switch (type) { - case BASE: filename = "base_" + maxTxn; break; + case BASE: filename = AcidUtils.BASE_PREFIX + maxTxn + (visibilityId > 0 ? AcidUtils.VISIBILITY_PREFIX + visibilityId : ""); break; case LENGTH_FILE: // Fall through to delta case DELTA: filename = makeDeltaDirName(minTxn, maxTxn); break; case LEGACY: break; // handled below @@ -580,4 +597,16 @@ public abstract class CompactorTest { String makeDeleteDeltaDirNameCompacted(long minTxnId, long maxTxnId) { return AcidUtils.deleteDeltaSubdir(minTxnId, maxTxnId); } + + protected long compactInTxn(CompactionRequest rqst) throws Exception { + txnHandler.compact(rqst); + CompactionInfo ci = txnHandler.findNextToCompact("fred"); + ci.runAs = System.getProperty("user.name"); + long compactTxn = openTxn(TxnType.COMPACTION); + txnHandler.updateCompactorState(ci, compactTxn); + txnHandler.markCompacted(ci); + txnHandler.commitTxn(new CommitTxnRequest(compactTxn)); + Thread.sleep(MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS)); + return compactTxn; + } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java index a806c16..d005373 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java @@ -19,12 +19,15 @@ package org.apache.hadoop.hive.ql.txn.compactor; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; import org.apache.hadoop.hive.metastore.api.CompactionRequest; import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.TxnType; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.junit.After; @@ -34,6 +37,7 @@ import org.junit.Test; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeUnit; /** * Tests for the compactor Cleaner thread @@ -54,7 +58,36 @@ public class TestCleaner extends CompactorTest { addBaseFile(t, null, 20L, 20); addDeltaFile(t, null, 21L, 22L, 2); addDeltaFile(t, null, 23L, 24L, 2); - addBaseFile(t, null, 25L, 25); + + + burnThroughTransactions("default", "camtc", 25); + + CompactionRequest rqst = new CompactionRequest("default", "camtc", CompactionType.MAJOR); + long compactTxn = compactInTxn(rqst); + addBaseFile(t, null, 25L, 25, compactTxn); + + startCleaner(); + + // Check there are no compactions requests left. + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals(1, rsp.getCompactsSize()); + Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); + + // Check that the files are removed + List<Path> paths = getDirectories(conf, t, null); + Assert.assertEquals(1, paths.size()); + Assert.assertEquals("base_25_v26", paths.get(0).getName()); + } + + + @Test + public void cleanupAfterMajorTableCompactionWithLongRunningQuery() throws Exception { + Table t = newTable("default", "camtc", false); + + addBaseFile(t, null, 20L, 20); + addDeltaFile(t, null, 21L, 22L, 2); + addDeltaFile(t, null, 23L, 24L, 2); + addBaseFile(t, null, 25L, 25, 26); burnThroughTransactions("default", "camtc", 25); @@ -62,20 +95,37 @@ public class TestCleaner extends CompactorTest { txnHandler.compact(rqst); CompactionInfo ci = txnHandler.findNextToCompact("fred"); ci.runAs = System.getProperty("user.name"); - txnHandler.updateCompactorState(ci, openTxn()); + long compactTxn = openTxn(TxnType.COMPACTION); + txnHandler.updateCompactorState(ci, compactTxn); txnHandler.markCompacted(ci); + // Open a query during compaction + long longQuery = openTxn(); + txnHandler.commitTxn(new CommitTxnRequest(compactTxn)); startCleaner(); - // Check there are no compactions requests left. + // The long running query should prevent the cleanup ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); Assert.assertEquals(1, rsp.getCompactsSize()); - Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); + Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState()); - // Check that the files are removed + // Check that the files are not removed List<Path> paths = getDirectories(conf, t, null); + Assert.assertEquals(4, paths.size()); + + // After the commit cleaning can proceed + txnHandler.commitTxn(new CommitTxnRequest(longQuery)); + Thread.sleep(MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS)); + startCleaner(); + + rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals(1, rsp.getCompactsSize()); + Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); + + // Check that the files are removed + paths = getDirectories(conf, t, null); Assert.assertEquals(1, paths.size()); - Assert.assertEquals("base_25", paths.get(0).getName()); + Assert.assertEquals("base_25_v26", paths.get(0).getName()); } @Test @@ -92,18 +142,14 @@ public class TestCleaner extends CompactorTest { CompactionRequest rqst = new CompactionRequest("default", "campc", CompactionType.MAJOR); rqst.setPartitionname("ds=today"); - txnHandler.compact(rqst); - CompactionInfo ci = txnHandler.findNextToCompact("fred"); - ci.runAs = System.getProperty("user.name"); - txnHandler.updateCompactorState(ci, openTxn()); - txnHandler.markCompacted(ci); + compactInTxn(rqst); startCleaner(); // Check there are no compactions requests left. ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); Assert.assertEquals(1, rsp.getCompactsSize()); - Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); + Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); // Check that the files are removed List<Path> paths = getDirectories(conf, t, p); @@ -123,18 +169,14 @@ public class TestCleaner extends CompactorTest { burnThroughTransactions("default", "camitc", 25); CompactionRequest rqst = new CompactionRequest("default", "camitc", CompactionType.MINOR); - txnHandler.compact(rqst); - CompactionInfo ci = txnHandler.findNextToCompact("fred"); - ci.runAs = System.getProperty("user.name"); - txnHandler.updateCompactorState(ci, openTxn()); - txnHandler.markCompacted(ci); + compactInTxn(rqst); startCleaner(); // Check there are no compactions requests left. ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); Assert.assertEquals(1, rsp.getCompactsSize()); - Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); + Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); // Check that the files are removed List<Path> paths = getDirectories(conf, t, null); @@ -163,18 +205,14 @@ public class TestCleaner extends CompactorTest { CompactionRequest rqst = new CompactionRequest("default", "camipc", CompactionType.MINOR); rqst.setPartitionname("ds=today"); - txnHandler.compact(rqst); - CompactionInfo ci = txnHandler.findNextToCompact("fred"); - ci.runAs = System.getProperty("user.name"); - txnHandler.updateCompactorState(ci, openTxn()); - txnHandler.markCompacted(ci); + compactInTxn(rqst); startCleaner(); // Check there are no compactions requests left. ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); Assert.assertEquals(1, rsp.getCompactsSize()); - Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); + Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); // Check that the files are removed List<Path> paths = getDirectories(conf, t, p); @@ -202,18 +240,14 @@ public class TestCleaner extends CompactorTest { CompactionRequest rqst = new CompactionRequest("default", "campcnb", CompactionType.MAJOR); rqst.setPartitionname("ds=today"); - txnHandler.compact(rqst); - CompactionInfo ci = txnHandler.findNextToCompact("fred"); - txnHandler.markCompacted(ci); - ci.runAs = System.getProperty("user.name"); - txnHandler.updateCompactorState(ci, openTxn()); + compactInTxn(rqst); startCleaner(); // Check there are no compactions requests left. ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); Assert.assertEquals(1, rsp.getCompactsSize()); - Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); + Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); // Check that the files are removed List<Path> paths = getDirectories(conf, t, p); @@ -232,11 +266,7 @@ public class TestCleaner extends CompactorTest { burnThroughTransactions("default", "dt", 25); CompactionRequest rqst = new CompactionRequest("default", "dt", CompactionType.MINOR); - txnHandler.compact(rqst); - CompactionInfo ci = txnHandler.findNextToCompact("fred"); - ci.runAs = System.getProperty("user.name"); - txnHandler.updateCompactorState(ci, openTxn()); - txnHandler.markCompacted(ci); + compactInTxn(rqst); // Drop table will clean the table entry from the compaction queue and hence cleaner have no effect ms.dropTable("default", "dt"); @@ -261,11 +291,7 @@ public class TestCleaner extends CompactorTest { CompactionRequest rqst = new CompactionRequest("default", "dp", CompactionType.MAJOR); rqst.setPartitionname("ds=today"); - txnHandler.compact(rqst); - CompactionInfo ci = txnHandler.findNextToCompact("fred"); - ci.runAs = System.getProperty("user.name"); - txnHandler.updateCompactorState(ci, openTxn()); - txnHandler.markCompacted(ci); + compactInTxn(rqst); // Drop partition will clean the partition entry from the compaction queue and hence cleaner have no effect ms.dropPartition("default", "dp", Collections.singletonList("today"), true); @@ -296,11 +322,7 @@ public class TestCleaner extends CompactorTest { for(int i = 0; i < 10; i++) { CompactionRequest rqst = new CompactionRequest("default", "camipc", CompactionType.MINOR); rqst.setPartitionname("ds=today"+i); - txnHandler.compact(rqst); - CompactionInfo ci = txnHandler.findNextToCompact("fred"); - ci.runAs = System.getProperty("user.name"); - txnHandler.updateCompactorState(ci, openTxn()); - txnHandler.markCompacted(ci); + compactInTxn(rqst); } conf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_THREADS_NUM, 3); @@ -309,7 +331,7 @@ public class TestCleaner extends CompactorTest { // Check there are no compactions requests left. ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); Assert.assertEquals(10, rsp.getCompactsSize()); - Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); + Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); // Check that the files are removed for (Partition pa : partitions) { @@ -339,4 +361,5 @@ public class TestCleaner extends CompactorTest { public void tearDown() throws Exception { compactorTestCleanup(); } + } diff --git a/ql/src/test/results/clientpositive/llap/cardinality_preserving_join_opt2.q.out b/ql/src/test/results/clientpositive/llap/cardinality_preserving_join_opt2.q.out index 3e0aea6..bae07a0 100644 --- a/ql/src/test/results/clientpositive/llap/cardinality_preserving_join_opt2.q.out +++ b/ql/src/test/results/clientpositive/llap/cardinality_preserving_join_opt2.q.out @@ -239,7 +239,7 @@ HiveProject(c1=[$11], c5=[$13], c6=[$14], c3=[$1], c4=[$2], c51=[$3], c61=[$4], HiveFilter(condition=[IS NOT NULL($0)]) HiveTableScan(table=[[mydb_e10, d4_tab_e10]], table:alias=[r]) HiveProject(c1=[$0], c2=[$1], c4=[$3], c5=[$4]) - HiveFilter(condition=[AND(OR(IS NULL($4), >($4, 2020-10-01)), IS NOT NULL($1), IS NOT NULL($0))]) + HiveFilter(condition=[AND(OR(IS NULL($4), >($4, 2020-11-01)), IS NOT NULL($1), IS NOT NULL($0))]) HiveTableScan(table=[[mydb_e10, f2_tab_e10]], table:alias=[cm]) PREHOOK: query: drop table f_tab_e10 diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index 5c6c94c..6950f0f 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.metastore.txn; import org.apache.hadoop.hive.common.classification.RetrySemantics; import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.TxnType; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; import org.apache.hadoop.util.StringUtils; @@ -236,8 +237,7 @@ class CompactionTxnHandler extends TxnHandler { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); String s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_STATE\" = '" + READY_FOR_CLEANING + "', " - + "\"CQ_WORKER_ID\" = NULL, \"CQ_NEXT_TXN_ID\" = " - + "(SELECT MAX(\"TXN_ID\") + 1 FROM \"TXNS\")" + + "\"CQ_WORKER_ID\" = NULL" + " WHERE \"CQ_ID\" = " + info.id; LOG.debug("Going to execute update <" + s + ">"); int updCnt = stmt.executeUpdate(s); @@ -267,11 +267,12 @@ class CompactionTxnHandler extends TxnHandler { /** * Find entries in the queue that are ready to * be cleaned. + * @param minOpenTxnWaterMark Minimum open txnId * @return information on the entry in the queue. */ @Override @RetrySemantics.ReadOnly - public List<CompactionInfo> findReadyToClean() throws MetaException { + public List<CompactionInfo> findReadyToClean(long minOpenTxnWaterMark) throws MetaException { Connection dbConn = null; List<CompactionInfo> rc = new ArrayList<>(); @@ -281,9 +282,16 @@ class CompactionTxnHandler extends TxnHandler { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", " + - "\"CQ_TYPE\", \"CQ_RUN_AS\", \"CQ_HIGHEST_WRITE_ID\" FROM \"COMPACTION_QUEUE\" " + - "WHERE \"CQ_STATE\" = '" + READY_FOR_CLEANING + "'"; + /* + * By filtering on minOpenTxnWaterMark, we will only cleanup after every transaction is committed, that could see + * the uncompacted deltas. This way the cleaner can clean up everything that was made obsolete by this compaction. + */ + String s = "SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", " + + "\"CQ_TYPE\", \"CQ_RUN_AS\", \"CQ_HIGHEST_WRITE_ID\" FROM \"COMPACTION_QUEUE\" WHERE \"CQ_STATE\" = '" + + READY_FOR_CLEANING + "'"; + if (minOpenTxnWaterMark > 0) { + s = s + " AND (\"CQ_NEXT_TXN_ID\" <= " + minOpenTxnWaterMark + " OR \"CQ_NEXT_TXN_ID\" IS NULL)"; + } LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); @@ -313,7 +321,7 @@ class CompactionTxnHandler extends TxnHandler { close(rs, stmt, dbConn); } } catch (RetryException e) { - return findReadyToClean(); + return findReadyToClean(minOpenTxnWaterMark); } } @@ -747,7 +755,7 @@ class CompactionTxnHandler extends TxnHandler { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); String sqlText = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_HIGHEST_WRITE_ID\" = " + - ci.highestWriteId + ", \"CQ_RUN_AS\" = " + quoteString(ci.runAs) + + ci.highestWriteId + ", \"CQ_RUN_AS\" = " + quoteString(ci.runAs) + ", \"CQ_TXN_ID\" = " + compactionTxnId + " WHERE \"CQ_ID\" = " + ci.id; if(LOG.isDebugEnabled()) { LOG.debug("About to execute: " + sqlText); @@ -1120,37 +1128,12 @@ class CompactionTxnHandler extends TxnHandler { @Override @RetrySemantics.Idempotent - public long findMinOpenTxnIdForCleaner() throws MetaException{ + public long findMinOpenTxnIdForCleaner() throws MetaException { Connection dbConn = null; - Statement stmt = null; - ResultSet rs = null; try { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); - String query = "SELECT COUNT(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\" = " + TxnStatus.OPEN; - LOG.debug("Going to execute query <" + query + ">"); - rs = stmt.executeQuery(query); - if (!rs.next()) { - throw new MetaException("Transaction tables not properly initialized."); - } - long numOpenTxns = rs.getLong(1); - if (numOpenTxns > 0) { - query = "SELECT MIN(\"RES\".\"ID\") FROM (" + - "SELECT MIN(\"TXN_ID\") AS \"ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = " + TxnStatus.OPEN + - " UNION " + - "SELECT MAX(\"CQ_NEXT_TXN_ID\") AS \"ID\" FROM \"COMPACTION_QUEUE\" WHERE \"CQ_STATE\" = " - + quoteChar(READY_FOR_CLEANING) + - ") \"RES\""; - } else { - query = "SELECT MAX(\"TXN_ID\") + 1 FROM \"TXNS\""; - } - LOG.debug("Going to execute query <" + query + ">"); - rs = stmt.executeQuery(query); - if (!rs.next()) { - throw new MetaException("Transaction tables not properly initialized, no record found in TXNS"); - } - return rs.getLong(1); + return getMinOpenTxnIdWaterMark(dbConn); } catch (SQLException e) { LOG.error("Unable to getMinOpenTxnIdForCleaner", e); rollbackDBConn(dbConn); @@ -1158,12 +1141,21 @@ class CompactionTxnHandler extends TxnHandler { throw new MetaException("Unable to execute getMinOpenTxnIfForCleaner() " + StringUtils.stringifyException(e)); } finally { - close(rs, stmt, dbConn); + closeDbConn(dbConn); } } catch (RetryException e) { return findMinOpenTxnIdForCleaner(); } } + + protected void updateWSCommitIdAndCleanUpMetadata(Statement stmt, long txnid, TxnType txnType, Long commitId, + long tempId) throws SQLException { + super.updateWSCommitIdAndCleanUpMetadata(stmt, txnid, txnType, commitId, tempId); + if (txnType == TxnType.COMPACTION) { + stmt.executeUpdate( + "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_NEXT_TXN_ID\" = " + commitId + " WHERE \"CQ_TXN_ID\" = " + txnid); + } + } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 0225db2..e0a78aa 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -1337,6 +1337,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } } } + } else if (txnType.get() == TxnType.COMPACTION) { + acquireTxnLock(stmt, false); + commitId = getHighWaterMark(stmt); } else { /* * current txn didn't update/delete anything (may have inserted), so just proceed with commit @@ -1508,7 +1511,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } } - private void updateWSCommitIdAndCleanUpMetadata(Statement stmt, long txnid, TxnType txnType, + /** + * See overridden method in CompactionTxnHandler also. + */ + protected void updateWSCommitIdAndCleanUpMetadata(Statement stmt, long txnid, TxnType txnType, Long commitId, long tempId) throws SQLException { List<String> queryBatch = new ArrayList<>(5); // update write_set with real commitId @@ -1526,7 +1532,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { if (txnType == TxnType.MATER_VIEW_REBUILD) { queryBatch.add("DELETE FROM \"MATERIALIZATION_REBUILD_LOCKS\" WHERE \"MRL_TXN_ID\" = " + txnid); } - // execute all in one batch executeQueriesInBatchNoCount(dbProduct, stmt, queryBatch, maxBatchSize); } @@ -2228,36 +2233,43 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { public void performWriteSetGC() throws MetaException { Connection dbConn = null; Statement stmt = null; - ResultSet rs = null; try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - - long minOpenTxn; - rs = stmt.executeQuery("SELECT MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\"=" + TxnStatus.OPEN); - if (!rs.next()) { - throw new IllegalStateException("Scalar query returned no rows?!?!!"); - } - minOpenTxn = rs.getLong(1); - if (rs.wasNull()) { - minOpenTxn = Long.MAX_VALUE; - } - long lowWaterMark = getOpenTxnTimeoutLowBoundaryTxnId(dbConn); - /** - * We try to find the highest transactionId below everything was committed or aborted. - * For that we look for the lowest open transaction in the TXNS and the TxnMinTimeout boundary, - * because it is guaranteed there won't be open transactions below that. - */ - long commitHighWaterMark = Long.min(minOpenTxn, lowWaterMark + 1); - LOG.debug("Perform WriteSet GC with minOpenTxn {}, lowWaterMark {}", minOpenTxn, lowWaterMark); + long commitHighWaterMark = getMinOpenTxnIdWaterMark(dbConn); int delCnt = stmt.executeUpdate("DELETE FROM \"WRITE_SET\" WHERE \"WS_COMMIT_ID\" < " + commitHighWaterMark); LOG.info("Deleted {} obsolete rows from WRITE_SET", delCnt); dbConn.commit(); } catch (SQLException ex) { LOG.warn("WriteSet GC failed due to " + getMessage(ex), ex); } finally { - close(rs, stmt, dbConn); + close(null, stmt, dbConn); + } + } + + protected long getMinOpenTxnIdWaterMark(Connection dbConn) throws MetaException, SQLException { + /** + * We try to find the highest transactionId below everything was committed or aborted. + * For that we look for the lowest open transaction in the TXNS and the TxnMinTimeout boundary, + * because it is guaranteed there won't be open transactions below that. + */ + long minOpenTxn; + try (Statement stmt = dbConn.createStatement()) { + try (ResultSet rs = stmt + .executeQuery("SELECT MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\"=" + TxnStatus.OPEN)) { + if (!rs.next()) { + throw new IllegalStateException("Scalar query returned no rows?!?!!"); + } + minOpenTxn = rs.getLong(1); + if (rs.wasNull()) { + minOpenTxn = Long.MAX_VALUE; + } + } } + long lowWaterMark = getOpenTxnTimeoutLowBoundaryTxnId(dbConn); + + LOG.debug("MinOpenTxnIdWaterMark calculated with minOpenTxn {}, lowWaterMark {}", minOpenTxn, lowWaterMark); + return Long.min(minOpenTxn, lowWaterMark + 1); } /** diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 6cc38b2..d11b88a 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -385,10 +385,11 @@ public interface TxnStore extends Configurable { /** * Find entries in the queue that are ready to * be cleaned. + * @param minOpenTxnWaterMark Minimum open txnId * @return information on the entry in the queue. */ @RetrySemantics.ReadOnly - List<CompactionInfo> findReadyToClean() throws MetaException; + List<CompactionInfo> findReadyToClean(long minOpenTxnWaterMark) throws MetaException; /** * This will remove an entry from the queue after diff --git a/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql b/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql index 02e960f..bc239e7 100644 --- a/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql +++ b/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql @@ -609,7 +609,8 @@ CREATE TABLE COMPACTION_QUEUE ( CQ_META_INFO varchar(2048) for bit data, CQ_HADOOP_JOB_ID varchar(32), CQ_ERROR_MESSAGE clob, - CQ_NEXT_TXN_ID bigint + CQ_NEXT_TXN_ID bigint, + CQ_TXN_ID bigint ); CREATE TABLE NEXT_COMPACTION_QUEUE_ID ( diff --git a/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql b/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql index 9d61cbb..2ef5ad9 100644 --- a/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql +++ b/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql @@ -115,5 +115,8 @@ CREATE TABLE "APP"."STORED_PROCS" ( CREATE UNIQUE INDEX "UNIQUESTOREDPROC" ON "STORED_PROCS" ("NAME", "DB_ID"); ALTER TABLE "STORED_PROCS" ADD CONSTRAINT "STOREDPROC_FK1" FOREIGN KEY ("DB_ID") REFERENCES "DBS" ("DB_ID"); +-- HIVE-24291 +ALTER TABLE COMPACTION_QUEUE ADD CQ_TXN_ID bigint; + -- This needs to be the last thing done. Insert any changes above this line. UPDATE "APP".VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1; diff --git a/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql b/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql index 4478409..7f9ef92 100644 --- a/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql +++ b/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql @@ -1017,7 +1017,8 @@ CREATE TABLE COMPACTION_QUEUE( CQ_META_INFO varbinary(2048) NULL, CQ_HADOOP_JOB_ID nvarchar(128) NULL, CQ_ERROR_MESSAGE varchar(max) NULL, - CQ_NEXT_TXN_ID bigint NOT NULL, + CQ_NEXT_TXN_ID bigint NULL, + CQ_TXN_ID bigint NULL PRIMARY KEY CLUSTERED ( CQ_ID ASC diff --git a/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql b/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql index 33b6587..148d089 100644 --- a/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql +++ b/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql @@ -151,6 +151,9 @@ CREATE TABLE "STORED_PROCS" ( CREATE UNIQUE INDEX "UNIQUESTOREDPROC" ON "STORED_PROCS" ("NAME", "DB_ID"); ALTER TABLE "STORED_PROCS" ADD CONSTRAINT "STOREDPROC_FK1" FOREIGN KEY ("DB_ID") REFERENCES "DBS" ("DB_ID"); +-- HIVE-24291 +ALTER TABLE COMPACTION_QUEUE ADD CQ_TXN_ID bigint NULL; + -- These lines need to be last. Insert any changes above. UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1; SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS MESSAGE; diff --git a/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql b/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql index b2e6983..88c9576 100644 --- a/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql +++ b/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql @@ -1077,7 +1077,8 @@ CREATE TABLE COMPACTION_QUEUE ( CQ_META_INFO varbinary(2048), CQ_HADOOP_JOB_ID varchar(32), CQ_ERROR_MESSAGE mediumtext, - CQ_NEXT_TXN_ID bigint + CQ_NEXT_TXN_ID bigint, + CQ_TXN_ID bigint ) ENGINE=InnoDB DEFAULT CHARSET=latin1; CREATE TABLE COMPLETED_COMPACTIONS ( diff --git a/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql b/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql index 544ec15..039e2bf 100644 --- a/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql +++ b/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql @@ -123,6 +123,9 @@ CREATE TABLE STORED_PROCS ( CREATE UNIQUE INDEX UNIQUESTOREDPROC ON STORED_PROCS (NAME, DB_ID); ALTER TABLE `STORED_PROCS` ADD CONSTRAINT `STOREDPROC_FK1` FOREIGN KEY (`DB_ID`) REFERENCES DBS (`DB_ID`); +-- HIVE-24291 +ALTER TABLE COMPACTION_QUEUE ADD CQ_TXN_ID bigint; + -- These lines need to be last. Insert any changes above. UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1; SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS MESSAGE; diff --git a/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql b/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql index cde4b90..39acc71 100644 --- a/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql +++ b/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql @@ -1058,7 +1058,8 @@ CREATE TABLE COMPACTION_QUEUE ( CQ_META_INFO BLOB, CQ_HADOOP_JOB_ID varchar2(32), CQ_ERROR_MESSAGE CLOB, - CQ_NEXT_TXN_ID NUMBER(19) + CQ_NEXT_TXN_ID NUMBER(19), + CQ_TXN_ID NUMBER(19) ) ROWDEPENDENCIES; CREATE TABLE NEXT_COMPACTION_QUEUE_ID ( diff --git a/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql b/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql index d83d722..59b645e 100644 --- a/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql +++ b/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql @@ -120,6 +120,9 @@ CREATE TABLE "STORED_PROCS" ( CREATE UNIQUE INDEX UNIQUESTOREDPROC ON STORED_PROCS ("NAME", "DB_ID"); ALTER TABLE "STORED_PROCS" ADD CONSTRAINT "STOREDPROC_FK1" FOREIGN KEY ("DB_ID") REFERENCES "DBS" ("DB_ID"); +-- HIVE-24291 +ALTER TABLE COMPACTION_QUEUE ADD CQ_TXN_ID NUMBER(19); + -- These lines need to be last. Insert any changes above. UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1; SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS Status from dual; diff --git a/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql b/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql index 57b323c..b673f14 100644 --- a/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql +++ b/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql @@ -1743,7 +1743,8 @@ CREATE TABLE "COMPACTION_QUEUE" ( "CQ_META_INFO" bytea, "CQ_HADOOP_JOB_ID" varchar(32), "CQ_ERROR_MESSAGE" text, - "CQ_NEXT_TXN_ID" bigint + "CQ_NEXT_TXN_ID" bigint, + "CQ_TXN_ID" bigint ); CREATE TABLE "NEXT_COMPACTION_QUEUE_ID" ( diff --git a/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql b/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql index 8b3792c..be23744 100644 --- a/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql +++ b/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql @@ -247,6 +247,9 @@ CREATE TABLE "STORED_PROCS" ( CREATE UNIQUE INDEX "UNIQUESTOREDPROC" ON "STORED_PROCS" ("NAME", "DB_ID"); ALTER TABLE ONLY "STORED_PROCS" ADD CONSTRAINT "STOREDPROC_FK1" FOREIGN KEY ("DB_ID") REFERENCES "DBS" ("DB_ID") DEFERRABLE; +-- HIVE-24291 +ALTER TABLE "COMPACTION_QUEUE" ADD "CQ_TXN_ID" bigint; + -- These lines need to be last. Insert any changes above. UPDATE "VERSION" SET "SCHEMA_VERSION"='4.0.0', "VERSION_COMMENT"='Hive release version 4.0.0' where "VER_ID"=1; SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0';