SourabhBadhya commented on code in PR #4313:
URL: https://github.com/apache/hive/pull/4313#discussion_r1219106647
##########
ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java:
##########
@@ -320,4 +329,397 @@ public void
testAbortedCleaningWithThreeTxnsWithDiffWriteIds() throws Exception
List<Path> directories = getDirectories(conf, t, null);
Assert.assertEquals(5, directories.size());
}
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testAbortCleanupNotUpdatingSpecificCompactionTables(boolean
isPartitioned) throws Exception {
+ String dbName = "default", tableName =
"abort_cleanup_not_populating_compaction_tables_test", partName = "today";
+ Table t = newTable(dbName, tableName, isPartitioned);
+ Partition p = isPartitioned ? newPartition(t, partName) : null;
+
+ // 3-aborted deltas & one committed delta
+ addDeltaFileWithTxnComponents(t, p, 2, true);
+ addDeltaFileWithTxnComponents(t, p, 2, true);
+ addDeltaFileWithTxnComponents(t, p, 2, false);
+ addDeltaFileWithTxnComponents(t, p, 2, true);
+
+ MetastoreConf.setBoolVar(conf,
MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, true);
+ HiveConf.setIntVar(conf,
HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 0);
+ MetadataCache metadataCache = new MetadataCache(true);
+ FSRemover mockedFSRemover = Mockito.spy(new FSRemover(conf,
ReplChangeManager.getInstance(conf), metadataCache));
+ TaskHandler mockedTaskHandler = Mockito.spy(new AbortedTxnCleaner(conf,
txnHandler, metadataCache,
+ false, mockedFSRemover));
+
+ runInitiator(conf);
+ // Initiator must not add anything to compaction_queue
+ String compactionQueuePresence = "SELECT COUNT(*) FROM
\"COMPACTION_QUEUE\" " +
+ " WHERE \"CQ_DATABASE\" = '" + dbName+ "' AND \"CQ_TABLE\" = '" +
tableName + "' AND \"CQ_PARTITION\"" +
+ (isPartitioned ? " = 'ds=" + partName + "'" : " IS NULL");
+ Assert.assertEquals(0, TestTxnDbUtil.countQueryAgent(conf,
compactionQueuePresence));
+
+ Cleaner cleaner = new Cleaner();
+ cleaner.setConf(conf);
+ cleaner.init(new AtomicBoolean(true));
+ cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler));
+ cleaner.run();
+
+ Mockito.verify(mockedFSRemover,
Mockito.times(1)).clean(any(CleanupRequest.class));
+ Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks();
+
+ Assert.assertEquals(0, TestTxnDbUtil.countQueryAgent(conf,
compactionQueuePresence));
+ Assert.assertEquals(0, TestTxnDbUtil.countQueryAgent(conf, "SELECT
COUNT(*) FROM \"COMPLETED_COMPACTIONS\" " +
+ " WHERE \"CC_DATABASE\" = '" + dbName+ "' AND \"CC_TABLE\" = '" +
tableName + "' AND \"CC_PARTITION\"" +
+ (isPartitioned ? " = 'ds=" + partName + "'" : " IS NULL")));
+ Assert.assertEquals(1, TestTxnDbUtil.countQueryAgent(conf, "SELECT
COUNT(*) FROM \"COMPLETED_TXN_COMPONENTS\" " +
+ " WHERE \"CTC_DATABASE\" = '" + dbName+ "' AND \"CTC_TABLE\" = '"
+ tableName + "' AND \"CTC_PARTITION\"" +
+ (isPartitioned ? " = 'ds=" + partName + "'" : " IS NULL")));
+
+ List<Path> directories = getDirectories(conf, t, null);
+ // All aborted directories removed, hence 1 committed delta directory must
be present
+ Assert.assertEquals(1, directories.size());
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testRetryEntryOnFailures(boolean isPartitioned) throws Exception
{
+ String dbName = "default", tableName = "handler_retry_entry", partName =
"today";
+ Table t = newTable(dbName, tableName, isPartitioned);
+ Partition p = isPartitioned ? newPartition(t, partName) : null;
+
+ // Add 2 committed deltas and 2 aborted deltas
+ addDeltaFileWithTxnComponents(t, p, 2, false);
+ addDeltaFileWithTxnComponents(t, p, 2, true);
+ addDeltaFileWithTxnComponents(t, p, 2, true);
+ addDeltaFileWithTxnComponents(t, p, 2, false);
+
+ HiveConf.setIntVar(conf,
HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 0);
+ MetadataCache metadataCache = new MetadataCache(true);
+ FSRemover mockedFSRemover = Mockito.spy(new FSRemover(conf,
ReplChangeManager.getInstance(conf), metadataCache));
+ TxnStore mockedTxnHandler = Mockito.spy(txnHandler);
+ TaskHandler mockedTaskHandler = Mockito.spy(new AbortedTxnCleaner(conf,
mockedTxnHandler, metadataCache,
+ false, mockedFSRemover));
+ // Invoke runtime exception when calling markCleaned.
+ Mockito.doAnswer(invocationOnMock -> {
+ throw new RuntimeException("Testing retry");
+ }).when(mockedFSRemover).clean(any());
+
+ Cleaner cleaner = new Cleaner();
+ cleaner.setConf(conf);
+ cleaner.init(new AtomicBoolean(true));
+ cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler));
+ cleaner.run();
+
+ Mockito.verify(mockedTxnHandler,
Mockito.times(1)).setAbortCleanerRetryRetentionTimeOnError(any(AbortTxnRequestInfo.class));
+ String whereClause = " WHERE \"CQ_DATABASE\" = '" + dbName+ "' AND
\"CQ_TABLE\" = '" + tableName + "' AND \"CQ_PARTITION\"" +
+ (isPartitioned ? " = 'ds=" + partName + "'" : " IS NULL") + " AND
\"CQ_TYPE\" = 'c' AND \"CQ_STATE\" = 'r'";
+ Assert.assertEquals(1, TestTxnDbUtil.countQueryAgent(conf, "SELECT
COUNT(*) FROM \"COMPACTION_QUEUE\" " + whereClause));
+ String retryRetentionQuery = "SELECT \"CQ_RETRY_RETENTION\" FROM
\"COMPACTION_QUEUE\" " + whereClause;
Review Comment:
Used showCompact output. Done.
##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/AbortedTxnCleaner.java:
##########
@@ -118,10 +120,16 @@ private void clean(CompactionInfo info, long
minOpenWriteTxn, boolean metricsEna
abortCleanUsingAcidDir(info, location, minOpenWriteTxn);
} catch (InterruptedException e) {
+ LOG.error("Caught an interrupted exception when cleaning, unable to
complete cleaning of {} due to {}", info,
+ e.getMessage());
+ info.errorMessage = e.getMessage();
+ handleCleanerAttemptFailure(info);
throw e;
} catch (Exception e) {
LOG.error("Caught exception when cleaning, unable to complete cleaning
of {} due to {}", info,
e.getMessage());
+ info.errorMessage = e.getMessage();
+ handleCleanerAttemptFailure(info);
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]