This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new e305109  HIVE-24291: Compaction cleaner should wait for all prev txn 
to commit (Peter Varga, reviewed by Karen Coppage, Denys Kuzmenko)
e305109 is described below

commit e305109ec3e0f2e8359e501633cee9f893bf471c
Author: Peter Varga <pva...@cloudera.com>
AuthorDate: Mon Nov 2 16:21:12 2020 +0100

    HIVE-24291: Compaction cleaner should wait for all prev txn to commit 
(Peter Varga, reviewed by Karen Coppage, Denys Kuzmenko)
    
    Closes (#1592)
---
 .../hive/ql/txn/compactor/CompactorTestUtil.java   |   5 +
 .../txn/compactor/TestCleanerWithReplication.java  |  24 +----
 .../hadoop/hive/ql/txn/compactor/Cleaner.java      |   2 +-
 .../metastore/txn/TestCompactionTxnHandler.java    |  30 +++---
 .../apache/hadoop/hive/ql/TestTxnCommands2.java    |  12 ++-
 .../hadoop/hive/ql/TxnCommandsBaseForTests.java    |   4 +
 .../hive/ql/txn/compactor/CompactorTest.java       |  43 ++++++--
 .../hadoop/hive/ql/txn/compactor/TestCleaner.java  | 115 ++++++++++++---------
 .../llap/cardinality_preserving_join_opt2.q.out    |   2 +-
 .../hive/metastore/txn/CompactionTxnHandler.java   |  64 +++++-------
 .../hadoop/hive/metastore/txn/TxnHandler.java      |  56 ++++++----
 .../apache/hadoop/hive/metastore/txn/TxnStore.java |   3 +-
 .../src/main/sql/derby/hive-schema-4.0.0.derby.sql |   3 +-
 .../sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql     |   3 +
 .../src/main/sql/mssql/hive-schema-4.0.0.mssql.sql |   3 +-
 .../sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql     |   3 +
 .../src/main/sql/mysql/hive-schema-4.0.0.mysql.sql |   3 +-
 .../sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql     |   3 +
 .../main/sql/oracle/hive-schema-4.0.0.oracle.sql   |   3 +-
 .../sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql   |   3 +
 .../sql/postgres/hive-schema-4.0.0.postgres.sql    |   3 +-
 .../postgres/upgrade-3.2.0-to-4.0.0.postgres.sql   |   3 +
 22 files changed, 232 insertions(+), 158 deletions(-)

diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java
index 9db2229..9bc3c61 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.metastore.api.CompactionRequest;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.ql.IDriver;
@@ -52,6 +53,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
@@ -145,6 +147,9 @@ class CompactorTestUtil {
    * @throws Exception if cleaner cannot be started.
    */
   static void runCleaner(HiveConf hConf) throws Exception {
+    // Wait for the cooldown period so the Cleaner can see last committed txn 
as the highest committed watermark
+    Thread.sleep(MetastoreConf.getTimeVar(hConf, 
MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS));
+
     HiveConf hiveConf = new HiveConf(hConf);
     Cleaner t = new Cleaner();
     t.setThreadId((int) t.getId());
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
index d956067..0a96502 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
@@ -111,11 +111,7 @@ public class TestCleanerWithReplication extends 
CompactorTest {
     burnThroughTransactions(dbName, "camtc", 25);
 
     CompactionRequest rqst = new CompactionRequest(dbName, "camtc", 
CompactionType.MAJOR);
-    txnHandler.compact(rqst);
-    CompactionInfo ci = txnHandler.findNextToCompact("fred");
-    ci.runAs = System.getProperty("user.name");
-    txnHandler.updateCompactorState(ci, openTxn());
-    txnHandler.markCompacted(ci);
+    compactInTxn(rqst);
 
     assertCleanerActions(6);
   }
@@ -134,11 +130,7 @@ public class TestCleanerWithReplication extends 
CompactorTest {
 
     CompactionRequest rqst = new CompactionRequest(dbName, "campc", 
CompactionType.MAJOR);
     rqst.setPartitionname("ds=today");
-    txnHandler.compact(rqst);
-    CompactionInfo ci = txnHandler.findNextToCompact("fred");
-    ci.runAs = System.getProperty("user.name");
-    txnHandler.updateCompactorState(ci, openTxn());
-    txnHandler.markCompacted(ci);
+    compactInTxn(rqst);
 
     assertCleanerActions(6);
   }
@@ -155,11 +147,7 @@ public class TestCleanerWithReplication extends 
CompactorTest {
     burnThroughTransactions(dbName, "camitc", 25);
 
     CompactionRequest rqst = new CompactionRequest(dbName, "camitc", 
CompactionType.MINOR);
-    txnHandler.compact(rqst);
-    CompactionInfo ci = txnHandler.findNextToCompact("fred");
-    ci.runAs = System.getProperty("user.name");
-    txnHandler.updateCompactorState(ci, openTxn());
-    txnHandler.markCompacted(ci);
+    compactInTxn(rqst);
 
     assertCleanerActions(4);
   }
@@ -178,11 +166,7 @@ public class TestCleanerWithReplication extends 
CompactorTest {
 
     CompactionRequest rqst = new CompactionRequest(dbName, "camipc", 
CompactionType.MINOR);
     rqst.setPartitionname("ds=today");
-    txnHandler.compact(rqst);
-    CompactionInfo ci = txnHandler.findNextToCompact("fred");
-    ci.runAs = System.getProperty("user.name");
-    txnHandler.updateCompactorState(ci, openTxn());
-    txnHandler.markCompacted(ci);
+    compactInTxn(rqst);
 
     assertCleanerActions(4);
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index ae09088..550ce7e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -97,7 +97,7 @@ public class Cleaner extends MetaStoreCompactorThread {
           long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
           LOG.info("Cleaning based on min open txn id: " + minOpenTxnId);
           List<CompletableFuture> cleanerList = new ArrayList<>();
-          for (CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
+          for (CompactionInfo compactionInfo : 
txnHandler.findReadyToClean(minOpenTxnId)) {
             
cleanerList.add(CompletableFuture.runAsync(CompactorUtil.ThrowingRunnable.unchecked(()
 ->
                   clean(compactionInfo, minOpenTxnId)), cleanerExecutor));
           }
diff --git 
a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
 
b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
index 8d7c7d2..51ca05f 100644
--- 
a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
+++ 
b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
@@ -183,15 +183,15 @@ public class TestCompactionTxnHandler {
     CompactionRequest rqst = new CompactionRequest("foo", "bar", 
CompactionType.MINOR);
     rqst.setPartitionname("ds=today");
     txnHandler.compact(rqst);
-    assertEquals(0, txnHandler.findReadyToClean().size());
+    assertEquals(0, txnHandler.findReadyToClean(0).size());
     CompactionInfo ci = txnHandler.findNextToCompact("fred");
     assertNotNull(ci);
 
-    assertEquals(0, txnHandler.findReadyToClean().size());
+    assertEquals(0, txnHandler.findReadyToClean(0).size());
     txnHandler.markCompacted(ci);
     assertNull(txnHandler.findNextToCompact("fred"));
 
-    List<CompactionInfo> toClean = txnHandler.findReadyToClean();
+    List<CompactionInfo> toClean = txnHandler.findReadyToClean(0);
     assertEquals(1, toClean.size());
     assertNull(txnHandler.findNextToCompact("fred"));
 
@@ -212,20 +212,20 @@ public class TestCompactionTxnHandler {
     CompactionRequest rqst = new CompactionRequest("foo", "bar", 
CompactionType.MINOR);
     rqst.setPartitionname("ds=today");
     txnHandler.compact(rqst);
-    assertEquals(0, txnHandler.findReadyToClean().size());
+    assertEquals(0, txnHandler.findReadyToClean(0).size());
     CompactionInfo ci = txnHandler.findNextToCompact("fred");
     assertNotNull(ci);
 
-    assertEquals(0, txnHandler.findReadyToClean().size());
+    assertEquals(0, txnHandler.findReadyToClean(0).size());
     txnHandler.markCompacted(ci);
     assertNull(txnHandler.findNextToCompact("fred"));
 
-    List<CompactionInfo> toClean = txnHandler.findReadyToClean();
+    List<CompactionInfo> toClean = txnHandler.findReadyToClean(0);
     assertEquals(1, toClean.size());
     assertNull(txnHandler.findNextToCompact("fred"));
     txnHandler.markCleaned(ci);
     assertNull(txnHandler.findNextToCompact("fred"));
-    assertEquals(0, txnHandler.findReadyToClean().size());
+    assertEquals(0, txnHandler.findReadyToClean(0).size());
 
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     assertEquals(1, rsp.getCompactsSize());
@@ -262,11 +262,11 @@ public class TestCompactionTxnHandler {
     CompactionRequest rqst = new CompactionRequest(dbName, tableName, 
CompactionType.MINOR);
     rqst.setPartitionname(partitionName);
     txnHandler.compact(rqst);
-    assertEquals(0, txnHandler.findReadyToClean().size());
+    assertEquals(0, txnHandler.findReadyToClean(0).size());
     CompactionInfo ci = txnHandler.findNextToCompact(workerId);
     assertNotNull(ci);
 
-    assertEquals(0, txnHandler.findReadyToClean().size());
+    assertEquals(0, txnHandler.findReadyToClean(0).size());
     ci.errorMessage = errorMessage;
     txnHandler.markFailed(ci);
     assertNull(txnHandler.findNextToCompact(workerId));
@@ -501,12 +501,13 @@ public class TestCompactionTxnHandler {
     // Now clean them and check that they are removed from the count.
     CompactionRequest rqst = new CompactionRequest("mydb", "mytable", 
CompactionType.MAJOR);
     txnHandler.compact(rqst);
-    assertEquals(0, txnHandler.findReadyToClean().size());
+    assertEquals(0, txnHandler.findReadyToClean(0).size());
     ci = txnHandler.findNextToCompact("fred");
     assertNotNull(ci);
     txnHandler.markCompacted(ci);
 
-    List<CompactionInfo> toClean = txnHandler.findReadyToClean();
+    Thread.sleep(txnHandler.getOpenTxnTimeOutMillis());
+    List<CompactionInfo> toClean = txnHandler.findReadyToClean(0);
     assertEquals(1, toClean.size());
     txnHandler.markCleaned(ci);
 
@@ -516,21 +517,20 @@ public class TestCompactionTxnHandler {
     // Create one aborted for low water mark
     txnid = openTxn();
     txnHandler.abortTxn(new AbortTxnRequest(txnid));
-    txnHandler.setOpenTxnTimeOutMillis(1);
+    Thread.sleep(txnHandler.getOpenTxnTimeOutMillis());
     txnHandler.cleanEmptyAbortedAndCommittedTxns();
     txnList = txnHandler.getOpenTxns();
     assertEquals(3, txnList.getOpen_txnsSize());
-    txnHandler.setOpenTxnTimeOutMillis(1000);
 
     rqst = new CompactionRequest("mydb", "foo", CompactionType.MAJOR);
     rqst.setPartitionname("bar");
     txnHandler.compact(rqst);
-    assertEquals(0, txnHandler.findReadyToClean().size());
+    assertEquals(0, txnHandler.findReadyToClean(0).size());
     ci = txnHandler.findNextToCompact("fred");
     assertNotNull(ci);
     txnHandler.markCompacted(ci);
 
-    toClean = txnHandler.findReadyToClean();
+    toClean = txnHandler.findReadyToClean(0);
     assertEquals(1, toClean.size());
     txnHandler.markCleaned(ci);
 
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java 
b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index be44951..7e4d3df 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -2110,19 +2110,23 @@ public class TestTxnCommands2 {
     Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from 
TXN_TO_WRITE_ID" + acidTblPartWhereClause),
             0, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from 
TXN_TO_WRITE_ID" + acidTblPartWhereClause));
 
-    // The cleaner will removed aborted txns data/metadata but cannot remove 
aborted txn2 from TXN_TO_WRITE_ID
-    // as there is a open txn < aborted txn2. The aborted txn1 < open txn and 
will be removed.
-    // Also, committed txn > open txn is retained.
+    // The cleaner after the compaction will not run, since the open txnId < 
compaction commit txnId
+    // Since aborted txns data/metadata was not removed, all data in 
TXN_TO_WRITE_ID should remain
     txnHandler.compact(new CompactionRequest("default", 
Table.ACIDTBL.name().toLowerCase(), CompactionType.MAJOR));
     runWorker(hiveConf);
     runCleaner(hiveConf);
     txnHandler.cleanEmptyAbortedAndCommittedTxns();
     txnHandler.cleanTxnToWriteIdTable();
     Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from 
TXN_TO_WRITE_ID"),
-            2, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from 
TXN_TO_WRITE_ID"));
+            3, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from 
TXN_TO_WRITE_ID"));
 
     // Commit the open txn, which lets the cleanup on TXN_TO_WRITE_ID.
     txnMgr.commitTxn();
+    txnMgr.openTxn(ctx, "u1");
+    txnMgr.getValidTxns();
+    // The txn opened after the compaction commit should not effect the Cleaner
+    runCleaner(hiveConf);
+    txnHandler.cleanEmptyAbortedAndCommittedTxns();
     txnHandler.cleanTxnToWriteIdTable();
 
     Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from 
TXN_TO_WRITE_ID"),
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java 
b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
index e49cc55..976c604 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
@@ -24,6 +24,7 @@ import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -36,6 +37,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClientWithLocalCache;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
@@ -206,6 +208,8 @@ public abstract class TxnCommandsBaseForTests {
     runCompactorThread(hiveConf, CompactorThreadType.WORKER);
   }
   public static void runCleaner(HiveConf hiveConf) throws Exception {
+    // Wait for the cooldown period so the Cleaner can see the last committed 
txn as the highest committed watermark
+    Thread.sleep(MetastoreConf.getTimeVar(hiveConf, 
MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS));
     runCompactorThread(hiveConf, CompactorThreadType.CLEANER);
   }
   public static void runInitiator(HiveConf hiveConf) throws Exception {
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
index 54e9529..e6272f6 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
 import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest;
 import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse;
 import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
@@ -45,7 +46,10 @@ import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.metastore.api.TxnType;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
@@ -82,6 +86,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.Stack;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -187,8 +192,13 @@ public abstract class CompactorTest {
   }
 
   protected long openTxn() throws MetaException {
-    List<Long> txns = txnHandler.openTxns(new OpenTxnRequest(1, 
System.getProperty("user.name"),
-        Worker.hostname())).getTxn_ids();
+    return openTxn(TxnType.DEFAULT);
+  }
+
+  protected long openTxn(TxnType txnType) throws MetaException {
+    OpenTxnRequest rqst = new OpenTxnRequest(1, 
System.getProperty("user.name"), Worker.hostname());
+    rqst.setTxn_type(txnType);
+    List<Long> txns = txnHandler.openTxns(rqst).getTxn_ids();
     return txns.get(0);
   }
 
@@ -214,6 +224,9 @@ public abstract class CompactorTest {
   protected void addBaseFile(Table t, Partition p, long maxTxn, int 
numRecords) throws Exception {
     addFile(t, p, 0, maxTxn, numRecords, FileType.BASE, 2, true);
   }
+  protected void addBaseFile(Table t, Partition p, long maxTxn, int 
numRecords, long visibilityId) throws Exception {
+    addFile(t, p, 0, maxTxn, numRecords, FileType.BASE, 2, true, visibilityId);
+  }
 
   protected void addLegacyFile(Table t, Partition p, int numRecords) throws 
Exception {
     addFile(t, p, 0, 0, numRecords, FileType.LEGACY, 2, true);
@@ -316,16 +329,20 @@ public abstract class CompactorTest {
     return location;
   }
 
-  private enum FileType {BASE, DELTA, LEGACY, LENGTH_FILE};
+  private enum FileType {BASE, DELTA, LEGACY, LENGTH_FILE}
 
-  private void addFile(Table t, Partition p, long minTxn, long maxTxn,
-                       int numRecords,  FileType type, int numBuckets,
-                       boolean allBucketsPresent) throws Exception {
+  private void addFile(Table t, Partition p, long minTxn, long maxTxn, int 
numRecords, FileType type, int numBuckets,
+      boolean allBucketsPresent) throws Exception {
+    addFile(t, p, minTxn, maxTxn, numRecords, type, numBuckets, 
allBucketsPresent, 0);
+  }
+
+  private void addFile(Table t, Partition p, long minTxn, long maxTxn, int 
numRecords, FileType type, int numBuckets,
+      boolean allBucketsPresent, long visibilityId) throws Exception {
     String partValue = (p == null) ? null : p.getValues().get(0);
     Path location = new Path(getLocation(t.getTableName(), partValue));
     String filename = null;
     switch (type) {
-      case BASE: filename = "base_" + maxTxn; break;
+      case BASE: filename = AcidUtils.BASE_PREFIX + maxTxn + (visibilityId > 0 
? AcidUtils.VISIBILITY_PREFIX + visibilityId : ""); break;
       case LENGTH_FILE: // Fall through to delta
       case DELTA: filename = makeDeltaDirName(minTxn, maxTxn); break;
       case LEGACY: break; // handled below
@@ -580,4 +597,16 @@ public abstract class CompactorTest {
   String makeDeleteDeltaDirNameCompacted(long minTxnId, long maxTxnId) {
     return AcidUtils.deleteDeltaSubdir(minTxnId, maxTxnId);
   }
+
+  protected long compactInTxn(CompactionRequest rqst) throws Exception {
+    txnHandler.compact(rqst);
+    CompactionInfo ci = txnHandler.findNextToCompact("fred");
+    ci.runAs = System.getProperty("user.name");
+    long compactTxn = openTxn(TxnType.COMPACTION);
+    txnHandler.updateCompactorState(ci, compactTxn);
+    txnHandler.markCompacted(ci);
+    txnHandler.commitTxn(new CommitTxnRequest(compactTxn));
+    Thread.sleep(MetastoreConf.getTimeVar(conf, 
MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS));
+    return compactTxn;
+  }
 }
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
index a806c16..d005373 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
@@ -19,12 +19,15 @@ package org.apache.hadoop.hive.ql.txn.compactor;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
 import org.apache.hadoop.hive.metastore.api.CompactionRequest;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
 import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnType;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.junit.After;
@@ -34,6 +37,7 @@ import org.junit.Test;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Tests for the compactor Cleaner thread
@@ -54,7 +58,36 @@ public class TestCleaner extends CompactorTest {
     addBaseFile(t, null, 20L, 20);
     addDeltaFile(t, null, 21L, 22L, 2);
     addDeltaFile(t, null, 23L, 24L, 2);
-    addBaseFile(t, null, 25L, 25);
+
+
+    burnThroughTransactions("default", "camtc", 25);
+
+    CompactionRequest rqst = new CompactionRequest("default", "camtc", 
CompactionType.MAJOR);
+    long compactTxn = compactInTxn(rqst);
+    addBaseFile(t, null, 25L, 25, compactTxn);
+
+    startCleaner();
+
+    // Check there are no compactions requests left.
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    Assert.assertEquals(1, rsp.getCompactsSize());
+    Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, 
rsp.getCompacts().get(0).getState());
+
+    // Check that the files are removed
+    List<Path> paths = getDirectories(conf, t, null);
+    Assert.assertEquals(1, paths.size());
+    Assert.assertEquals("base_25_v26", paths.get(0).getName());
+  }
+
+
+  @Test
+  public void cleanupAfterMajorTableCompactionWithLongRunningQuery() throws 
Exception {
+    Table t = newTable("default", "camtc", false);
+
+    addBaseFile(t, null, 20L, 20);
+    addDeltaFile(t, null, 21L, 22L, 2);
+    addDeltaFile(t, null, 23L, 24L, 2);
+    addBaseFile(t, null, 25L, 25, 26);
 
     burnThroughTransactions("default", "camtc", 25);
 
@@ -62,20 +95,37 @@ public class TestCleaner extends CompactorTest {
     txnHandler.compact(rqst);
     CompactionInfo ci = txnHandler.findNextToCompact("fred");
     ci.runAs = System.getProperty("user.name");
-    txnHandler.updateCompactorState(ci, openTxn());
+    long compactTxn = openTxn(TxnType.COMPACTION);
+    txnHandler.updateCompactorState(ci, compactTxn);
     txnHandler.markCompacted(ci);
+    // Open a query during compaction
+    long longQuery = openTxn();
+    txnHandler.commitTxn(new CommitTxnRequest(compactTxn));
 
     startCleaner();
 
-    // Check there are no compactions requests left.
+    // The long running query should prevent the cleanup
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     Assert.assertEquals(1, rsp.getCompactsSize());
-    
Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
+    Assert.assertEquals(TxnStore.CLEANING_RESPONSE, 
rsp.getCompacts().get(0).getState());
 
-    // Check that the files are removed
+    // Check that the files are not removed
     List<Path> paths = getDirectories(conf, t, null);
+    Assert.assertEquals(4, paths.size());
+
+    // After the commit cleaning can proceed
+    txnHandler.commitTxn(new CommitTxnRequest(longQuery));
+    Thread.sleep(MetastoreConf.getTimeVar(conf, 
MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS));
+    startCleaner();
+
+    rsp = txnHandler.showCompact(new ShowCompactRequest());
+    Assert.assertEquals(1, rsp.getCompactsSize());
+    Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, 
rsp.getCompacts().get(0).getState());
+
+    // Check that the files are removed
+    paths = getDirectories(conf, t, null);
     Assert.assertEquals(1, paths.size());
-    Assert.assertEquals("base_25", paths.get(0).getName());
+    Assert.assertEquals("base_25_v26", paths.get(0).getName());
   }
 
   @Test
@@ -92,18 +142,14 @@ public class TestCleaner extends CompactorTest {
 
     CompactionRequest rqst = new CompactionRequest("default", "campc", 
CompactionType.MAJOR);
     rqst.setPartitionname("ds=today");
-    txnHandler.compact(rqst);
-    CompactionInfo ci = txnHandler.findNextToCompact("fred");
-    ci.runAs = System.getProperty("user.name");
-    txnHandler.updateCompactorState(ci, openTxn());
-    txnHandler.markCompacted(ci);
+    compactInTxn(rqst);
 
     startCleaner();
 
     // Check there are no compactions requests left.
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     Assert.assertEquals(1, rsp.getCompactsSize());
-    
Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
+    Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, 
rsp.getCompacts().get(0).getState());
 
     // Check that the files are removed
     List<Path> paths = getDirectories(conf, t, p);
@@ -123,18 +169,14 @@ public class TestCleaner extends CompactorTest {
     burnThroughTransactions("default", "camitc", 25);
 
     CompactionRequest rqst = new CompactionRequest("default", "camitc", 
CompactionType.MINOR);
-    txnHandler.compact(rqst);
-    CompactionInfo ci = txnHandler.findNextToCompact("fred");
-    ci.runAs = System.getProperty("user.name");
-    txnHandler.updateCompactorState(ci, openTxn());
-    txnHandler.markCompacted(ci);
+    compactInTxn(rqst);
 
     startCleaner();
 
     // Check there are no compactions requests left.
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     Assert.assertEquals(1, rsp.getCompactsSize());
-    
Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
+    Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, 
rsp.getCompacts().get(0).getState());
 
     // Check that the files are removed
     List<Path> paths = getDirectories(conf, t, null);
@@ -163,18 +205,14 @@ public class TestCleaner extends CompactorTest {
 
     CompactionRequest rqst = new CompactionRequest("default", "camipc", 
CompactionType.MINOR);
     rqst.setPartitionname("ds=today");
-    txnHandler.compact(rqst);
-    CompactionInfo ci = txnHandler.findNextToCompact("fred");
-    ci.runAs = System.getProperty("user.name");
-    txnHandler.updateCompactorState(ci, openTxn());
-    txnHandler.markCompacted(ci);
+    compactInTxn(rqst);
 
     startCleaner();
 
     // Check there are no compactions requests left.
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     Assert.assertEquals(1, rsp.getCompactsSize());
-    
Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
+    Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, 
rsp.getCompacts().get(0).getState());
 
     // Check that the files are removed
     List<Path> paths = getDirectories(conf, t, p);
@@ -202,18 +240,14 @@ public class TestCleaner extends CompactorTest {
 
     CompactionRequest rqst = new CompactionRequest("default", "campcnb", 
CompactionType.MAJOR);
     rqst.setPartitionname("ds=today");
-    txnHandler.compact(rqst);
-    CompactionInfo ci = txnHandler.findNextToCompact("fred");
-    txnHandler.markCompacted(ci);
-    ci.runAs = System.getProperty("user.name");
-    txnHandler.updateCompactorState(ci, openTxn());
+    compactInTxn(rqst);
 
     startCleaner();
 
     // Check there are no compactions requests left.
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     Assert.assertEquals(1, rsp.getCompactsSize());
-    
Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
+    Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, 
rsp.getCompacts().get(0).getState());
 
     // Check that the files are removed
     List<Path> paths = getDirectories(conf, t, p);
@@ -232,11 +266,7 @@ public class TestCleaner extends CompactorTest {
     burnThroughTransactions("default", "dt", 25);
 
     CompactionRequest rqst = new CompactionRequest("default", "dt", 
CompactionType.MINOR);
-    txnHandler.compact(rqst);
-    CompactionInfo ci = txnHandler.findNextToCompact("fred");
-    ci.runAs = System.getProperty("user.name");
-    txnHandler.updateCompactorState(ci, openTxn());
-    txnHandler.markCompacted(ci);
+    compactInTxn(rqst);
 
     // Drop table will clean the table entry from the compaction queue and 
hence cleaner have no effect
     ms.dropTable("default", "dt");
@@ -261,11 +291,7 @@ public class TestCleaner extends CompactorTest {
 
     CompactionRequest rqst = new CompactionRequest("default", "dp", 
CompactionType.MAJOR);
     rqst.setPartitionname("ds=today");
-    txnHandler.compact(rqst);
-    CompactionInfo ci = txnHandler.findNextToCompact("fred");
-    ci.runAs = System.getProperty("user.name");
-    txnHandler.updateCompactorState(ci, openTxn());
-    txnHandler.markCompacted(ci);
+    compactInTxn(rqst);
 
     // Drop partition will clean the partition entry from the compaction queue 
and hence cleaner have no effect
     ms.dropPartition("default", "dp", Collections.singletonList("today"), 
true);
@@ -296,11 +322,7 @@ public class TestCleaner extends CompactorTest {
     for(int i = 0; i < 10; i++) {
       CompactionRequest rqst = new CompactionRequest("default", "camipc", 
CompactionType.MINOR);
       rqst.setPartitionname("ds=today"+i);
-      txnHandler.compact(rqst);
-      CompactionInfo ci = txnHandler.findNextToCompact("fred");
-      ci.runAs = System.getProperty("user.name");
-      txnHandler.updateCompactorState(ci, openTxn());
-      txnHandler.markCompacted(ci);
+      compactInTxn(rqst);
     }
 
     conf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_THREADS_NUM, 3);
@@ -309,7 +331,7 @@ public class TestCleaner extends CompactorTest {
     // Check there are no compactions requests left.
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     Assert.assertEquals(10, rsp.getCompactsSize());
-    
Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
+    Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, 
rsp.getCompacts().get(0).getState());
 
     // Check that the files are removed
     for (Partition pa : partitions) {
@@ -339,4 +361,5 @@ public class TestCleaner extends CompactorTest {
   public void tearDown() throws Exception {
     compactorTestCleanup();
   }
+
 }
diff --git 
a/ql/src/test/results/clientpositive/llap/cardinality_preserving_join_opt2.q.out
 
b/ql/src/test/results/clientpositive/llap/cardinality_preserving_join_opt2.q.out
index 3e0aea6..bae07a0 100644
--- 
a/ql/src/test/results/clientpositive/llap/cardinality_preserving_join_opt2.q.out
+++ 
b/ql/src/test/results/clientpositive/llap/cardinality_preserving_join_opt2.q.out
@@ -239,7 +239,7 @@ HiveProject(c1=[$11], c5=[$13], c6=[$14], c3=[$1], c4=[$2], 
c51=[$3], c61=[$4],
             HiveFilter(condition=[IS NOT NULL($0)])
               HiveTableScan(table=[[mydb_e10, d4_tab_e10]], table:alias=[r])
         HiveProject(c1=[$0], c2=[$1], c4=[$3], c5=[$4])
-          HiveFilter(condition=[AND(OR(IS NULL($4), >($4, 2020-10-01)), IS NOT 
NULL($1), IS NOT NULL($0))])
+          HiveFilter(condition=[AND(OR(IS NULL($4), >($4, 2020-11-01)), IS NOT 
NULL($1), IS NOT NULL($0))])
             HiveTableScan(table=[[mydb_e10, f2_tab_e10]], table:alias=[cm])
 
 PREHOOK: query: drop table f_tab_e10
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 5c6c94c..6950f0f 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.metastore.txn;
 import org.apache.hadoop.hive.common.classification.RetrySemantics;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
 import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.TxnType;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
 import org.apache.hadoop.util.StringUtils;
@@ -236,8 +237,7 @@ class CompactionTxnHandler extends TxnHandler {
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         stmt = dbConn.createStatement();
         String s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_STATE\" = '" + 
READY_FOR_CLEANING + "', "
-            + "\"CQ_WORKER_ID\" = NULL, \"CQ_NEXT_TXN_ID\" = "
-            + "(SELECT MAX(\"TXN_ID\") + 1 FROM \"TXNS\")"
+            + "\"CQ_WORKER_ID\" = NULL"
             + " WHERE \"CQ_ID\" = " + info.id;
         LOG.debug("Going to execute update <" + s + ">");
         int updCnt = stmt.executeUpdate(s);
@@ -267,11 +267,12 @@ class CompactionTxnHandler extends TxnHandler {
   /**
    * Find entries in the queue that are ready to
    * be cleaned.
+   * @param minOpenTxnWaterMark Minimum open txnId
    * @return information on the entry in the queue.
    */
   @Override
   @RetrySemantics.ReadOnly
-  public List<CompactionInfo> findReadyToClean() throws MetaException {
+  public List<CompactionInfo> findReadyToClean(long minOpenTxnWaterMark) 
throws MetaException {
     Connection dbConn = null;
     List<CompactionInfo> rc = new ArrayList<>();
 
@@ -281,9 +282,16 @@ class CompactionTxnHandler extends TxnHandler {
       try {
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         stmt = dbConn.createStatement();
-        String s = "SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", 
\"CQ_PARTITION\", " +
-            "\"CQ_TYPE\", \"CQ_RUN_AS\", \"CQ_HIGHEST_WRITE_ID\" FROM 
\"COMPACTION_QUEUE\" " +
-            "WHERE \"CQ_STATE\" = '" + READY_FOR_CLEANING + "'";
+        /*
+         * By filtering on minOpenTxnWaterMark, we will only cleanup after 
every transaction is committed, that could see
+         * the uncompacted deltas. This way the cleaner can clean up 
everything that was made obsolete by this compaction.
+         */
+        String s = "SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", 
\"CQ_PARTITION\", "
+                + "\"CQ_TYPE\", \"CQ_RUN_AS\", \"CQ_HIGHEST_WRITE_ID\" FROM 
\"COMPACTION_QUEUE\" WHERE \"CQ_STATE\" = '"
+                + READY_FOR_CLEANING + "'";
+        if (minOpenTxnWaterMark > 0) {
+          s = s + " AND (\"CQ_NEXT_TXN_ID\" <= " + minOpenTxnWaterMark + " OR 
\"CQ_NEXT_TXN_ID\" IS NULL)";
+        }
         LOG.debug("Going to execute query <" + s + ">");
         rs = stmt.executeQuery(s);
 
@@ -313,7 +321,7 @@ class CompactionTxnHandler extends TxnHandler {
         close(rs, stmt, dbConn);
       }
     } catch (RetryException e) {
-      return findReadyToClean();
+      return findReadyToClean(minOpenTxnWaterMark);
     }
   }
 
@@ -747,7 +755,7 @@ class CompactionTxnHandler extends TxnHandler {
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         stmt = dbConn.createStatement();
         String sqlText = "UPDATE \"COMPACTION_QUEUE\" SET 
\"CQ_HIGHEST_WRITE_ID\" = " +
-            ci.highestWriteId + ", \"CQ_RUN_AS\" = " + quoteString(ci.runAs) +
+            ci.highestWriteId + ", \"CQ_RUN_AS\" = " + quoteString(ci.runAs) + 
", \"CQ_TXN_ID\" = " + compactionTxnId +
             " WHERE \"CQ_ID\" = " + ci.id;
         if(LOG.isDebugEnabled()) {
           LOG.debug("About to execute: " + sqlText);
@@ -1120,37 +1128,12 @@ class CompactionTxnHandler extends TxnHandler {
 
   @Override
   @RetrySemantics.Idempotent
-  public long findMinOpenTxnIdForCleaner() throws MetaException{
+  public long findMinOpenTxnIdForCleaner() throws MetaException {
     Connection dbConn = null;
-    Statement stmt = null;
-    ResultSet rs = null;
     try {
       try {
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-        String query = "SELECT COUNT(\"TXN_ID\") FROM \"TXNS\" WHERE 
\"TXN_STATE\" = " + TxnStatus.OPEN;
-        LOG.debug("Going to execute query <" + query + ">");
-        rs = stmt.executeQuery(query);
-        if (!rs.next()) {
-          throw new MetaException("Transaction tables not properly 
initialized.");
-        }
-        long numOpenTxns = rs.getLong(1);
-        if (numOpenTxns > 0) {
-          query = "SELECT MIN(\"RES\".\"ID\") FROM (" +
-              "SELECT MIN(\"TXN_ID\") AS \"ID\" FROM \"TXNS\" WHERE 
\"TXN_STATE\" = " + TxnStatus.OPEN +
-              " UNION " +
-              "SELECT MAX(\"CQ_NEXT_TXN_ID\") AS \"ID\" FROM 
\"COMPACTION_QUEUE\" WHERE \"CQ_STATE\" = "
-              + quoteChar(READY_FOR_CLEANING) +
-              ") \"RES\"";
-        } else {
-          query = "SELECT MAX(\"TXN_ID\") + 1 FROM \"TXNS\"";
-        }
-        LOG.debug("Going to execute query <" + query + ">");
-        rs = stmt.executeQuery(query);
-        if (!rs.next()) {
-          throw new MetaException("Transaction tables not properly 
initialized, no record found in TXNS");
-        }
-        return rs.getLong(1);
+        return getMinOpenTxnIdWaterMark(dbConn);
       } catch (SQLException e) {
         LOG.error("Unable to getMinOpenTxnIdForCleaner", e);
         rollbackDBConn(dbConn);
@@ -1158,12 +1141,21 @@ class CompactionTxnHandler extends TxnHandler {
         throw new MetaException("Unable to execute getMinOpenTxnIfForCleaner() 
" +
             StringUtils.stringifyException(e));
       } finally {
-        close(rs, stmt, dbConn);
+        closeDbConn(dbConn);
       }
     } catch (RetryException e) {
       return findMinOpenTxnIdForCleaner();
     }
   }
+
+  protected void updateWSCommitIdAndCleanUpMetadata(Statement stmt, long 
txnid, TxnType txnType, Long commitId,
+      long tempId) throws SQLException {
+    super.updateWSCommitIdAndCleanUpMetadata(stmt, txnid, txnType, commitId, 
tempId);
+    if (txnType == TxnType.COMPACTION) {
+      stmt.executeUpdate(
+          "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_NEXT_TXN_ID\" = " + commitId + 
" WHERE \"CQ_TXN_ID\" = " + txnid);
+    }
+  }
 }
 
 
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 0225db2..e0a78aa 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -1337,6 +1337,9 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
               }
             }
           }
+        } else if (txnType.get() == TxnType.COMPACTION) {
+          acquireTxnLock(stmt, false);
+          commitId = getHighWaterMark(stmt);
         } else {
           /*
            * current txn didn't update/delete anything (may have inserted), so 
just proceed with commit
@@ -1508,7 +1511,10 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
     }
   }
 
-  private void updateWSCommitIdAndCleanUpMetadata(Statement stmt, long txnid, 
TxnType txnType,
+  /**
+   * See overridden method in CompactionTxnHandler also.
+   */
+  protected void updateWSCommitIdAndCleanUpMetadata(Statement stmt, long 
txnid, TxnType txnType,
       Long commitId, long tempId) throws SQLException {
     List<String> queryBatch = new ArrayList<>(5);
     // update write_set with real commitId
@@ -1526,7 +1532,6 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
     if (txnType == TxnType.MATER_VIEW_REBUILD) {
       queryBatch.add("DELETE FROM \"MATERIALIZATION_REBUILD_LOCKS\" WHERE 
\"MRL_TXN_ID\" = " + txnid);
     }
-
     // execute all in one batch
     executeQueriesInBatchNoCount(dbProduct, stmt, queryBatch, maxBatchSize);
   }
@@ -2228,36 +2233,43 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
   public void performWriteSetGC() throws MetaException {
     Connection dbConn = null;
     Statement stmt = null;
-    ResultSet rs = null;
     try {
       dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
       stmt = dbConn.createStatement();
-
-      long minOpenTxn;
-      rs = stmt.executeQuery("SELECT MIN(\"TXN_ID\") FROM \"TXNS\" WHERE 
\"TXN_STATE\"=" + TxnStatus.OPEN);
-      if (!rs.next()) {
-        throw new IllegalStateException("Scalar query returned no rows?!?!!");
-      }
-      minOpenTxn = rs.getLong(1);
-      if (rs.wasNull()) {
-        minOpenTxn = Long.MAX_VALUE;
-      }
-      long lowWaterMark = getOpenTxnTimeoutLowBoundaryTxnId(dbConn);
-      /**
-       * We try to find the highest transactionId below everything was 
committed or aborted.
-       * For that we look for the lowest open transaction in the TXNS and the 
TxnMinTimeout boundary,
-       * because it is guaranteed there won't be open transactions below that.
-       */
-      long commitHighWaterMark = Long.min(minOpenTxn, lowWaterMark + 1);
-      LOG.debug("Perform WriteSet GC with minOpenTxn {}, lowWaterMark {}", 
minOpenTxn, lowWaterMark);
+      long commitHighWaterMark = getMinOpenTxnIdWaterMark(dbConn);
       int delCnt = stmt.executeUpdate("DELETE FROM \"WRITE_SET\" WHERE 
\"WS_COMMIT_ID\" < " + commitHighWaterMark);
       LOG.info("Deleted {} obsolete rows from WRITE_SET", delCnt);
       dbConn.commit();
     } catch (SQLException ex) {
       LOG.warn("WriteSet GC failed due to " + getMessage(ex), ex);
     } finally {
-      close(rs, stmt, dbConn);
+      close(null, stmt, dbConn);
+    }
+  }
+
+  protected long getMinOpenTxnIdWaterMark(Connection dbConn) throws 
MetaException, SQLException {
+    /**
+     * We try to find the highest transactionId below everything was committed 
or aborted.
+     * For that we look for the lowest open transaction in the TXNS and the 
TxnMinTimeout boundary,
+     * because it is guaranteed there won't be open transactions below that.
+     */
+    long minOpenTxn;
+    try (Statement stmt = dbConn.createStatement()) {
+      try (ResultSet rs = stmt
+          .executeQuery("SELECT MIN(\"TXN_ID\") FROM \"TXNS\" WHERE 
\"TXN_STATE\"=" + TxnStatus.OPEN)) {
+        if (!rs.next()) {
+          throw new IllegalStateException("Scalar query returned no 
rows?!?!!");
+        }
+        minOpenTxn = rs.getLong(1);
+        if (rs.wasNull()) {
+          minOpenTxn = Long.MAX_VALUE;
+        }
+      }
     }
+    long lowWaterMark = getOpenTxnTimeoutLowBoundaryTxnId(dbConn);
+
+    LOG.debug("MinOpenTxnIdWaterMark calculated with minOpenTxn {}, 
lowWaterMark {}", minOpenTxn, lowWaterMark);
+    return Long.min(minOpenTxn, lowWaterMark + 1);
   }
 
   /**
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 6cc38b2..d11b88a 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -385,10 +385,11 @@ public interface TxnStore extends Configurable {
   /**
    * Find entries in the queue that are ready to
    * be cleaned.
+   * @param minOpenTxnWaterMark Minimum open txnId
    * @return information on the entry in the queue.
    */
   @RetrySemantics.ReadOnly
-  List<CompactionInfo> findReadyToClean() throws MetaException;
+  List<CompactionInfo> findReadyToClean(long minOpenTxnWaterMark) throws 
MetaException;
 
   /**
    * This will remove an entry from the queue after
diff --git 
a/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
 
b/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
index 02e960f..bc239e7 100644
--- 
a/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
+++ 
b/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
@@ -609,7 +609,8 @@ CREATE TABLE COMPACTION_QUEUE (
   CQ_META_INFO varchar(2048) for bit data,
   CQ_HADOOP_JOB_ID varchar(32),
   CQ_ERROR_MESSAGE clob,
-  CQ_NEXT_TXN_ID bigint
+  CQ_NEXT_TXN_ID bigint,
+  CQ_TXN_ID bigint
 );
 
 CREATE TABLE NEXT_COMPACTION_QUEUE_ID (
diff --git 
a/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
 
b/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
index 9d61cbb..2ef5ad9 100644
--- 
a/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
+++ 
b/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
@@ -115,5 +115,8 @@ CREATE TABLE "APP"."STORED_PROCS" (
 CREATE UNIQUE INDEX "UNIQUESTOREDPROC" ON "STORED_PROCS" ("NAME", "DB_ID");
 ALTER TABLE "STORED_PROCS" ADD CONSTRAINT "STOREDPROC_FK1" FOREIGN KEY 
("DB_ID") REFERENCES "DBS" ("DB_ID");
 
+-- HIVE-24291
+ALTER TABLE COMPACTION_QUEUE ADD CQ_TXN_ID bigint;
+
 -- This needs to be the last thing done.  Insert any changes above this line.
 UPDATE "APP".VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release 
version 4.0.0' where VER_ID=1;
diff --git 
a/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
 
b/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
index 4478409..7f9ef92 100644
--- 
a/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
+++ 
b/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
@@ -1017,7 +1017,8 @@ CREATE TABLE COMPACTION_QUEUE(
     CQ_META_INFO varbinary(2048) NULL,
        CQ_HADOOP_JOB_ID nvarchar(128) NULL,
        CQ_ERROR_MESSAGE varchar(max) NULL,
-    CQ_NEXT_TXN_ID bigint NOT NULL,
+    CQ_NEXT_TXN_ID bigint NULL,
+    CQ_TXN_ID bigint NULL
 PRIMARY KEY CLUSTERED
 (
        CQ_ID ASC
diff --git 
a/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
 
b/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
index 33b6587..148d089 100644
--- 
a/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
+++ 
b/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
@@ -151,6 +151,9 @@ CREATE TABLE "STORED_PROCS" (
 CREATE UNIQUE INDEX "UNIQUESTOREDPROC" ON "STORED_PROCS" ("NAME", "DB_ID");
 ALTER TABLE "STORED_PROCS" ADD CONSTRAINT "STOREDPROC_FK1" FOREIGN KEY 
("DB_ID") REFERENCES "DBS" ("DB_ID");
 
+-- HIVE-24291
+ALTER TABLE COMPACTION_QUEUE ADD CQ_TXN_ID bigint NULL;
+
 -- These lines need to be last.  Insert any changes above.
 UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release 
version 4.0.0' where VER_ID=1;
 SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS MESSAGE;
diff --git 
a/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
 
b/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
index b2e6983..88c9576 100644
--- 
a/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
+++ 
b/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
@@ -1077,7 +1077,8 @@ CREATE TABLE COMPACTION_QUEUE (
   CQ_META_INFO varbinary(2048),
   CQ_HADOOP_JOB_ID varchar(32),
   CQ_ERROR_MESSAGE mediumtext,
-  CQ_NEXT_TXN_ID bigint
+  CQ_NEXT_TXN_ID bigint,
+  CQ_TXN_ID bigint
 ) ENGINE=InnoDB DEFAULT CHARSET=latin1;
 
 CREATE TABLE COMPLETED_COMPACTIONS (
diff --git 
a/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
 
b/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
index 544ec15..039e2bf 100644
--- 
a/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
+++ 
b/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
@@ -123,6 +123,9 @@ CREATE TABLE STORED_PROCS (
 CREATE UNIQUE INDEX UNIQUESTOREDPROC ON STORED_PROCS (NAME, DB_ID);
 ALTER TABLE `STORED_PROCS` ADD CONSTRAINT `STOREDPROC_FK1` FOREIGN KEY 
(`DB_ID`) REFERENCES DBS (`DB_ID`);
 
+-- HIVE-24291
+ALTER TABLE COMPACTION_QUEUE ADD CQ_TXN_ID bigint;
+
 -- These lines need to be last.  Insert any changes above.
 UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release 
version 4.0.0' where VER_ID=1;
 SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS MESSAGE;
diff --git 
a/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
 
b/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
index cde4b90..39acc71 100644
--- 
a/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
+++ 
b/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
@@ -1058,7 +1058,8 @@ CREATE TABLE COMPACTION_QUEUE (
   CQ_META_INFO BLOB,
   CQ_HADOOP_JOB_ID varchar2(32),
   CQ_ERROR_MESSAGE CLOB,
-  CQ_NEXT_TXN_ID NUMBER(19)
+  CQ_NEXT_TXN_ID NUMBER(19),
+  CQ_TXN_ID NUMBER(19)
 ) ROWDEPENDENCIES;
 
 CREATE TABLE NEXT_COMPACTION_QUEUE_ID (
diff --git 
a/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
 
b/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
index d83d722..59b645e 100644
--- 
a/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
+++ 
b/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
@@ -120,6 +120,9 @@ CREATE TABLE "STORED_PROCS" (
 CREATE UNIQUE INDEX UNIQUESTOREDPROC ON STORED_PROCS ("NAME", "DB_ID");
 ALTER TABLE "STORED_PROCS" ADD CONSTRAINT "STOREDPROC_FK1" FOREIGN KEY 
("DB_ID") REFERENCES "DBS" ("DB_ID");
 
+-- HIVE-24291
+ALTER TABLE COMPACTION_QUEUE ADD CQ_TXN_ID NUMBER(19);
+
 -- These lines need to be last.  Insert any changes above.
 UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release 
version 4.0.0' where VER_ID=1;
 SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS Status 
from dual;
diff --git 
a/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
 
b/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
index 57b323c..b673f14 100644
--- 
a/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
+++ 
b/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
@@ -1743,7 +1743,8 @@ CREATE TABLE "COMPACTION_QUEUE" (
   "CQ_META_INFO" bytea,
   "CQ_HADOOP_JOB_ID" varchar(32),
   "CQ_ERROR_MESSAGE" text,
-  "CQ_NEXT_TXN_ID" bigint
+  "CQ_NEXT_TXN_ID" bigint,
+  "CQ_TXN_ID" bigint
 );
 
 CREATE TABLE "NEXT_COMPACTION_QUEUE_ID" (
diff --git 
a/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
 
b/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
index 8b3792c..be23744 100644
--- 
a/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
+++ 
b/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
@@ -247,6 +247,9 @@ CREATE TABLE "STORED_PROCS" (
 CREATE UNIQUE INDEX "UNIQUESTOREDPROC" ON "STORED_PROCS" ("NAME", "DB_ID");
 ALTER TABLE ONLY "STORED_PROCS" ADD CONSTRAINT "STOREDPROC_FK1" FOREIGN KEY 
("DB_ID") REFERENCES "DBS" ("DB_ID") DEFERRABLE;
 
+-- HIVE-24291
+ALTER TABLE "COMPACTION_QUEUE" ADD "CQ_TXN_ID" bigint;
+
 -- These lines need to be last. Insert any changes above.
 UPDATE "VERSION" SET "SCHEMA_VERSION"='4.0.0', "VERSION_COMMENT"='Hive release 
version 4.0.0' where "VER_ID"=1;
 SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0';

Reply via email to