This is an automated email from the ASF dual-hosted git repository. sbadhya pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 0f8190fefa5 Revert "HIVE-27637: Compare highest write ID of compaction records when trying to perform abort cleanup (Zsolt Miskolczi reviewed by Attila Turoczy, Sourabh Badhya)" (#5058) 0f8190fefa5 is described below commit 0f8190fefa56513a042566ef87b852e8343610aa Author: Sourabh Badhya <iamsbad...@gmail.com> AuthorDate: Fri Feb 2 18:59:36 2024 +0530 Revert "HIVE-27637: Compare highest write ID of compaction records when trying to perform abort cleanup (Zsolt Miskolczi reviewed by Attila Turoczy, Sourabh Badhya)" (#5058) This reverts commit f3439697343f3d5e1f1d007d8c878a6eb821713b. --- .../metastore/txn/TestCompactionTxnHandler.java | 44 ---------------------- .../compactor/handler/TestAbortedTxnCleaner.java | 17 +-------- .../txn/jdbc/queries/ReadyToCleanAbortHandler.java | 12 +----- 3 files changed, 3 insertions(+), 70 deletions(-) diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java index cd02eb1ba3e..d26f3774af7 100644 --- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java +++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java @@ -1056,50 +1056,6 @@ public class TestCompactionTxnHandler { assertEquals(1, potentials.size()); } - @Test - public void testFindReadyToCleanAborts() throws Exception { - long txnId = openTxn(); - - List<LockComponent> components = new ArrayList<>(); - components.add(createLockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb", "mytable", "mypartition=myvalue", DataOperationType.UPDATE)); - components.add(createLockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb", "yourtable", "mypartition=myvalue", DataOperationType.UPDATE)); - - allocateTableWriteIds("mydb", "mytable", txnId); - allocateTableWriteIds("mydb", "yourtable", txnId); - - LockRequest req = new LockRequest(components, "me", "localhost"); - req.setTxnid(txnId); - LockResponse res = txnHandler.lock(req); - assertSame(res.getState(), LockState.ACQUIRED); - - txnHandler.abortTxn(new AbortTxnRequest((txnId))); - - txnId = openTxn(); - components = new ArrayList<>(); - components.add(createLockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb", "mytable", "mypartition=myvalue", DataOperationType.UPDATE)); - allocateTableWriteIds("mydb", "mytable", txnId); - - req = new LockRequest(components, "me", "localhost"); - req.setTxnid(txnId); - res = txnHandler.lock(req); - assertSame(res.getState(), LockState.ACQUIRED); - - CompactionRequest rqst = new CompactionRequest("mydb", "mytable", CompactionType.MINOR); - rqst.setPartitionname("mypartition=myvalue"); - txnHandler.compact(rqst); - - CompactionInfo ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION)); - assertNotNull(ci); - ci.highestWriteId = 41; - txnHandler.updateCompactorState(ci, 0); - - List<CompactionInfo> potentials = txnHandler.findReadyToCleanAborts(1, 0); - assertEquals(1, potentials.size()); - CompactionInfo potentialToCleanAbort = potentials.get(0); - assertEquals("mydb", potentialToCleanAbort.dbname); - assertEquals("yourtable", potentialToCleanAbort.tableName); - } - private static FindNextCompactRequest aFindNextCompactRequest(String workerId, String workerVersion) { FindNextCompactRequest request = new FindNextCompactRequest(); request.setWorkerId(workerId); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java index 1d31aae9434..8f6814d4890 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java @@ -44,9 +44,7 @@ import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; @@ -284,21 +282,8 @@ public class TestAbortedTxnCleaner extends TestHandler { cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler)); cleaner.run(); - Mockito.verifyNoInteractions(mockedFSRemover); + Mockito.verify(mockedFSRemover, Mockito.times(1)).clean(any(CleanupRequest.class)); Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks(); - String compactionQueuePresence = "SELECT COUNT(*) FROM \"COMPACTION_QUEUE\" " + - " WHERE \"CQ_DATABASE\" = '" + dbName+ "' AND \"CQ_TABLE\" = '" + tableName + "' AND \"CQ_PARTITION\" IS NULL"; - Assert.assertEquals(1, TestTxnDbUtil.countQueryAgent(conf, compactionQueuePresence)); - - directories = getDirectories(conf, t, null); - // Both base and delta files are present since the cleaner skips them as there is a newer write. - Assert.assertEquals(5, directories.size()); - Assert.assertEquals(1, directories.stream().filter(dir -> dir.getName().startsWith(AcidUtils.BASE_PREFIX)).count()); - - // Run compaction and clean up - startInitiator(); - startWorker(); - startCleaner(); directories = getDirectories(conf, t, null); // The table is already compacted, so we must see 1 base delta diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanAbortHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanAbortHandler.java index eebe29dc441..4940d384095 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanAbortHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanAbortHandler.java @@ -52,7 +52,7 @@ public class ReadyToCleanAbortHandler implements QueryHandler<List<CompactionInf // First sub-query - Gets the aborted txns with min txn start time, number of aborted txns // for corresponding db, table, partition. " ( SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", MIN(\"TXN_STARTED\") AS \"MIN_TXN_START_TIME\", " + - " MAX(\"TC_WRITEID\") as \"MAX_ABORTED_WRITE_ID\", COUNT(*) AS \"ABORTED_TXN_COUNT\" FROM \"TXNS\", \"TXN_COMPONENTS\" " + + " COUNT(*) AS \"ABORTED_TXN_COUNT\" FROM \"TXNS\", \"TXN_COMPONENTS\" " + " WHERE \"TXN_ID\" = \"TC_TXNID\" AND \"TXN_STATE\" = :abortedState" + " GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" %s ) \"res1\" " + " LEFT JOIN" + @@ -76,15 +76,7 @@ public class ReadyToCleanAbortHandler implements QueryHandler<List<CompactionInf " AND \"res1\".\"TC_TABLE\" = \"res3\".\"CQ_TABLE\" " + " AND (\"res1\".\"TC_PARTITION\" = \"res3\".\"CQ_PARTITION\" " + " OR (\"res1\".\"TC_PARTITION\" IS NULL AND \"res3\".\"CQ_PARTITION\" IS NULL))" + - " WHERE (\"res3\".\"RETRY_RECORD_CHECK\" <= 0 OR \"res3\".\"RETRY_RECORD_CHECK\" IS NULL)" + - " AND NOT EXISTS (SELECT 1 " + - " FROM \"COMPACTION_QUEUE\" AS \"cq\" " + - " WHERE \"cq\".\"CQ_DATABASE\" = \"res1\".\"TC_DATABASE\" AND \"cq\".\"CQ_TABLE\" = \"res1\".\"TC_TABLE\"" + - " AND (\"cq\".\"CQ_PARTITION\" = \"res1\".\"TC_PARTITION\"" + - " OR (\"cq\".\"CQ_PARTITION\" IS NULL AND \"res1\".\"TC_PARTITION\" IS NULL))" + - " AND \"cq\".\"CQ_HIGHEST_WRITE_ID\" > \"res1\".\"MAX_ABORTED_WRITE_ID\"" + - " AND \"cq\".\"CQ_STATE\" " + - " IN ('i', 'w', 'r'))"; + " WHERE \"res3\".\"RETRY_RECORD_CHECK\" <= 0 OR \"res3\".\"RETRY_RECORD_CHECK\" IS NULL"; private final long abortedTimeThreshold; private final int abortedThreshold;