This is an automated email from the ASF dual-hosted git repository.

sbadhya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new f3439697343 HIVE-27637: Compare highest write ID of compaction records 
when trying to perform abort cleanup (#4740) (Zsolt Miskolczi reviewed by 
Attila Turoczy, Sourabh Badhya)
f3439697343 is described below

commit f3439697343f3d5e1f1d007d8c878a6eb821713b
Author: InvisibleProgrammer <zsolt.miskol...@gmail.com>
AuthorDate: Fri Feb 2 09:32:26 2024 +0100

    HIVE-27637: Compare highest write ID of compaction records when trying to 
perform abort cleanup (#4740) (Zsolt Miskolczi reviewed by Attila Turoczy, 
Sourabh Badhya)
---
 .../metastore/txn/TestCompactionTxnHandler.java    | 44 ++++++++++++++++++++++
 .../compactor/handler/TestAbortedTxnCleaner.java   | 17 ++++++++-
 .../txn/jdbc/queries/ReadyToCleanAbortHandler.java | 12 +++++-
 3 files changed, 70 insertions(+), 3 deletions(-)

diff --git 
a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
 
b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
index d26f3774af7..cd02eb1ba3e 100644
--- 
a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
+++ 
b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
@@ -1056,6 +1056,50 @@ public class TestCompactionTxnHandler {
     assertEquals(1, potentials.size());
   }
 
+  @Test
+  public void testFindReadyToCleanAborts() throws Exception {
+    long txnId = openTxn();
+
+    List<LockComponent> components = new ArrayList<>();
+    components.add(createLockComponent(LockType.SHARED_WRITE, LockLevel.DB, 
"mydb", "mytable", "mypartition=myvalue", DataOperationType.UPDATE));
+    components.add(createLockComponent(LockType.SHARED_WRITE, LockLevel.DB, 
"mydb", "yourtable", "mypartition=myvalue", DataOperationType.UPDATE));
+
+    allocateTableWriteIds("mydb", "mytable", txnId);
+    allocateTableWriteIds("mydb", "yourtable", txnId);
+
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(txnId);
+    LockResponse res = txnHandler.lock(req);
+    assertSame(res.getState(), LockState.ACQUIRED);
+
+    txnHandler.abortTxn(new AbortTxnRequest((txnId)));
+
+    txnId = openTxn();
+    components = new ArrayList<>();
+    components.add(createLockComponent(LockType.SHARED_WRITE, LockLevel.DB, 
"mydb", "mytable", "mypartition=myvalue", DataOperationType.UPDATE));
+    allocateTableWriteIds("mydb", "mytable", txnId);
+
+    req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(txnId);
+    res = txnHandler.lock(req);
+    assertSame(res.getState(), LockState.ACQUIRED);
+
+    CompactionRequest rqst = new CompactionRequest("mydb", "mytable", 
CompactionType.MINOR);
+    rqst.setPartitionname("mypartition=myvalue");
+    txnHandler.compact(rqst);
+
+    CompactionInfo ci = 
txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION));
+    assertNotNull(ci);
+    ci.highestWriteId = 41;
+    txnHandler.updateCompactorState(ci, 0);
+
+    List<CompactionInfo> potentials = txnHandler.findReadyToCleanAborts(1, 0);
+    assertEquals(1, potentials.size());
+    CompactionInfo potentialToCleanAbort = potentials.get(0);
+    assertEquals("mydb", potentialToCleanAbort.dbname);
+    assertEquals("yourtable", potentialToCleanAbort.tableName);
+  }
+
   private static FindNextCompactRequest aFindNextCompactRequest(String 
workerId, String workerVersion) {
     FindNextCompactRequest request = new FindNextCompactRequest();
     request.setWorkerId(workerId);
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java
 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java
index 8f6814d4890..1d31aae9434 100644
--- 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java
+++ 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java
@@ -44,7 +44,9 @@ import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -282,8 +284,21 @@ public class TestAbortedTxnCleaner extends TestHandler {
     cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler));
     cleaner.run();
 
-    Mockito.verify(mockedFSRemover, 
Mockito.times(1)).clean(any(CleanupRequest.class));
+    Mockito.verifyNoInteractions(mockedFSRemover);
     Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks();
+    String compactionQueuePresence = "SELECT COUNT(*) FROM 
\"COMPACTION_QUEUE\" " +
+            " WHERE \"CQ_DATABASE\" = '" + dbName+ "' AND \"CQ_TABLE\" = '" + 
tableName + "' AND \"CQ_PARTITION\" IS NULL";
+    Assert.assertEquals(1, TestTxnDbUtil.countQueryAgent(conf, 
compactionQueuePresence));
+
+    directories = getDirectories(conf, t, null);
+    // Both base and delta files are present since the cleaner skips them as 
there is a newer write.
+    Assert.assertEquals(5, directories.size());
+    Assert.assertEquals(1, directories.stream().filter(dir -> 
dir.getName().startsWith(AcidUtils.BASE_PREFIX)).count());
+
+    // Run compaction and clean up
+    startInitiator();
+    startWorker();
+    startCleaner();
 
     directories = getDirectories(conf, t, null);
     // The table is already compacted, so we must see 1 base delta
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanAbortHandler.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanAbortHandler.java
index 4940d384095..eebe29dc441 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanAbortHandler.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanAbortHandler.java
@@ -52,7 +52,7 @@ public class ReadyToCleanAbortHandler implements 
QueryHandler<List<CompactionInf
           // First sub-query - Gets the aborted txns with min txn start time, 
number of aborted txns
           // for corresponding db, table, partition.
           " ( SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", 
MIN(\"TXN_STARTED\") AS \"MIN_TXN_START_TIME\", " +
-          " COUNT(*) AS \"ABORTED_TXN_COUNT\" FROM \"TXNS\", 
\"TXN_COMPONENTS\" " +
+          " MAX(\"TC_WRITEID\") as \"MAX_ABORTED_WRITE_ID\", COUNT(*) AS 
\"ABORTED_TXN_COUNT\" FROM \"TXNS\", \"TXN_COMPONENTS\" " +
           " WHERE \"TXN_ID\" = \"TC_TXNID\" AND \"TXN_STATE\" = :abortedState" 
+
           " GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" %s ) 
\"res1\" " +
           " LEFT JOIN" +
@@ -76,7 +76,15 @@ public class ReadyToCleanAbortHandler implements 
QueryHandler<List<CompactionInf
           " AND \"res1\".\"TC_TABLE\" = \"res3\".\"CQ_TABLE\" " +
           " AND (\"res1\".\"TC_PARTITION\" = \"res3\".\"CQ_PARTITION\" " +
           " OR (\"res1\".\"TC_PARTITION\" IS NULL AND 
\"res3\".\"CQ_PARTITION\" IS NULL))" +
-          " WHERE \"res3\".\"RETRY_RECORD_CHECK\" <= 0 OR 
\"res3\".\"RETRY_RECORD_CHECK\" IS NULL";
+          " WHERE (\"res3\".\"RETRY_RECORD_CHECK\" <= 0 OR 
\"res3\".\"RETRY_RECORD_CHECK\" IS NULL)" +
+          " AND NOT EXISTS (SELECT 1 " +
+          " FROM \"COMPACTION_QUEUE\" AS \"cq\" " +
+          " WHERE \"cq\".\"CQ_DATABASE\" = \"res1\".\"TC_DATABASE\" AND 
\"cq\".\"CQ_TABLE\" = \"res1\".\"TC_TABLE\"" +
+          " AND (\"cq\".\"CQ_PARTITION\" = \"res1\".\"TC_PARTITION\"" +
+          " OR (\"cq\".\"CQ_PARTITION\" IS NULL AND \"res1\".\"TC_PARTITION\" 
IS NULL))" +
+          " AND \"cq\".\"CQ_HIGHEST_WRITE_ID\" > 
\"res1\".\"MAX_ABORTED_WRITE_ID\"" +
+          " AND \"cq\".\"CQ_STATE\" " +
+          " IN ('i', 'w', 'r'))";
 
   private final long abortedTimeThreshold;
   private final int abortedThreshold;

Reply via email to