This is an automated email from the ASF dual-hosted git repository. sbadhya pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new f3439697343 HIVE-27637: Compare highest write ID of compaction records when trying to perform abort cleanup (#4740) (Zsolt Miskolczi reviewed by Attila Turoczy, Sourabh Badhya) f3439697343 is described below commit f3439697343f3d5e1f1d007d8c878a6eb821713b Author: InvisibleProgrammer <zsolt.miskol...@gmail.com> AuthorDate: Fri Feb 2 09:32:26 2024 +0100 HIVE-27637: Compare highest write ID of compaction records when trying to perform abort cleanup (#4740) (Zsolt Miskolczi reviewed by Attila Turoczy, Sourabh Badhya) --- .../metastore/txn/TestCompactionTxnHandler.java | 44 ++++++++++++++++++++++ .../compactor/handler/TestAbortedTxnCleaner.java | 17 ++++++++- .../txn/jdbc/queries/ReadyToCleanAbortHandler.java | 12 +++++- 3 files changed, 70 insertions(+), 3 deletions(-) diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java index d26f3774af7..cd02eb1ba3e 100644 --- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java +++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java @@ -1056,6 +1056,50 @@ public class TestCompactionTxnHandler { assertEquals(1, potentials.size()); } + @Test + public void testFindReadyToCleanAborts() throws Exception { + long txnId = openTxn(); + + List<LockComponent> components = new ArrayList<>(); + components.add(createLockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb", "mytable", "mypartition=myvalue", DataOperationType.UPDATE)); + components.add(createLockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb", "yourtable", "mypartition=myvalue", DataOperationType.UPDATE)); + + allocateTableWriteIds("mydb", "mytable", txnId); + allocateTableWriteIds("mydb", "yourtable", txnId); + + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnId); + LockResponse res = txnHandler.lock(req); + assertSame(res.getState(), LockState.ACQUIRED); + + txnHandler.abortTxn(new AbortTxnRequest((txnId))); + + txnId = openTxn(); + components = new ArrayList<>(); + components.add(createLockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb", "mytable", "mypartition=myvalue", DataOperationType.UPDATE)); + allocateTableWriteIds("mydb", "mytable", txnId); + + req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnId); + res = txnHandler.lock(req); + assertSame(res.getState(), LockState.ACQUIRED); + + CompactionRequest rqst = new CompactionRequest("mydb", "mytable", CompactionType.MINOR); + rqst.setPartitionname("mypartition=myvalue"); + txnHandler.compact(rqst); + + CompactionInfo ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION)); + assertNotNull(ci); + ci.highestWriteId = 41; + txnHandler.updateCompactorState(ci, 0); + + List<CompactionInfo> potentials = txnHandler.findReadyToCleanAborts(1, 0); + assertEquals(1, potentials.size()); + CompactionInfo potentialToCleanAbort = potentials.get(0); + assertEquals("mydb", potentialToCleanAbort.dbname); + assertEquals("yourtable", potentialToCleanAbort.tableName); + } + private static FindNextCompactRequest aFindNextCompactRequest(String workerId, String workerVersion) { FindNextCompactRequest request = new FindNextCompactRequest(); request.setWorkerId(workerId); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java index 8f6814d4890..1d31aae9434 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java @@ -44,7 +44,9 @@ import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; @@ -282,8 +284,21 @@ public class TestAbortedTxnCleaner extends TestHandler { cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler)); cleaner.run(); - Mockito.verify(mockedFSRemover, Mockito.times(1)).clean(any(CleanupRequest.class)); + Mockito.verifyNoInteractions(mockedFSRemover); Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks(); + String compactionQueuePresence = "SELECT COUNT(*) FROM \"COMPACTION_QUEUE\" " + + " WHERE \"CQ_DATABASE\" = '" + dbName+ "' AND \"CQ_TABLE\" = '" + tableName + "' AND \"CQ_PARTITION\" IS NULL"; + Assert.assertEquals(1, TestTxnDbUtil.countQueryAgent(conf, compactionQueuePresence)); + + directories = getDirectories(conf, t, null); + // Both base and delta files are present since the cleaner skips them as there is a newer write. + Assert.assertEquals(5, directories.size()); + Assert.assertEquals(1, directories.stream().filter(dir -> dir.getName().startsWith(AcidUtils.BASE_PREFIX)).count()); + + // Run compaction and clean up + startInitiator(); + startWorker(); + startCleaner(); directories = getDirectories(conf, t, null); // The table is already compacted, so we must see 1 base delta diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanAbortHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanAbortHandler.java index 4940d384095..eebe29dc441 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanAbortHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanAbortHandler.java @@ -52,7 +52,7 @@ public class ReadyToCleanAbortHandler implements QueryHandler<List<CompactionInf // First sub-query - Gets the aborted txns with min txn start time, number of aborted txns // for corresponding db, table, partition. " ( SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", MIN(\"TXN_STARTED\") AS \"MIN_TXN_START_TIME\", " + - " COUNT(*) AS \"ABORTED_TXN_COUNT\" FROM \"TXNS\", \"TXN_COMPONENTS\" " + + " MAX(\"TC_WRITEID\") as \"MAX_ABORTED_WRITE_ID\", COUNT(*) AS \"ABORTED_TXN_COUNT\" FROM \"TXNS\", \"TXN_COMPONENTS\" " + " WHERE \"TXN_ID\" = \"TC_TXNID\" AND \"TXN_STATE\" = :abortedState" + " GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" %s ) \"res1\" " + " LEFT JOIN" + @@ -76,7 +76,15 @@ public class ReadyToCleanAbortHandler implements QueryHandler<List<CompactionInf " AND \"res1\".\"TC_TABLE\" = \"res3\".\"CQ_TABLE\" " + " AND (\"res1\".\"TC_PARTITION\" = \"res3\".\"CQ_PARTITION\" " + " OR (\"res1\".\"TC_PARTITION\" IS NULL AND \"res3\".\"CQ_PARTITION\" IS NULL))" + - " WHERE \"res3\".\"RETRY_RECORD_CHECK\" <= 0 OR \"res3\".\"RETRY_RECORD_CHECK\" IS NULL"; + " WHERE (\"res3\".\"RETRY_RECORD_CHECK\" <= 0 OR \"res3\".\"RETRY_RECORD_CHECK\" IS NULL)" + + " AND NOT EXISTS (SELECT 1 " + + " FROM \"COMPACTION_QUEUE\" AS \"cq\" " + + " WHERE \"cq\".\"CQ_DATABASE\" = \"res1\".\"TC_DATABASE\" AND \"cq\".\"CQ_TABLE\" = \"res1\".\"TC_TABLE\"" + + " AND (\"cq\".\"CQ_PARTITION\" = \"res1\".\"TC_PARTITION\"" + + " OR (\"cq\".\"CQ_PARTITION\" IS NULL AND \"res1\".\"TC_PARTITION\" IS NULL))" + + " AND \"cq\".\"CQ_HIGHEST_WRITE_ID\" > \"res1\".\"MAX_ABORTED_WRITE_ID\"" + + " AND \"cq\".\"CQ_STATE\" " + + " IN ('i', 'w', 'r'))"; private final long abortedTimeThreshold; private final int abortedThreshold;