This is an automated email from the ASF dual-hosted git repository. dkuzmenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 0e119ea HIVE-25781: Restore multi-threaded support in Cleaner after HIVE-25115 (Denys Kuzmenko, reviewed by Karen Coppage) 0e119ea is described below commit 0e119eaddb93dc10743bb8990ce8eca4fb77cf16 Author: Denys Kuzmenko <dkuzme...@cloudera.com> AuthorDate: Wed Dec 8 10:59:48 2021 +0200 HIVE-25781: Restore multi-threaded support in Cleaner after HIVE-25115 (Denys Kuzmenko, reviewed by Karen Coppage) Closes #2825 --- .../hive/ql/txn/compactor/TestCompactor.java | 4 ++ .../metastore/txn/TestCompactionTxnHandler.java | 35 ++++++++--- .../apache/hadoop/hive/ql/TestTxnCommands2.java | 2 + .../apache/hadoop/hive/ql/TestTxnCommands3.java | 1 + .../hadoop/hive/ql/txn/compactor/TestCleaner.java | 1 + .../hive/metastore/txn/CompactionTxnHandler.java | 69 ++++++++++++---------- 6 files changed, 72 insertions(+), 40 deletions(-) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index 7e48419..13705be 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -1621,6 +1621,10 @@ public class TestCompactor { verifyFooBarResult(tblName, 2); verifyHasBase(table.getSd(), fs, "base_0000005_v0000016"); runCleaner(conf); + // in case when we have # of accumulated entries for the same table/partition - we need to process them one-by-one in ASC order of write_id's, + // however, to support multi-threaded processing in the Cleaner, we have to move entries from the same group to the next Cleaner cycle, + // so that they are not processed by multiple threads concurrently. + runCleaner(conf); verifyDeltaCount(table.getSd(), fs, 0); } diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java index ea1abc6..9bfc324 100644 --- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java +++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java @@ -51,6 +51,7 @@ import org.junit.Test; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Set; import java.util.SortedSet; @@ -196,8 +197,9 @@ public class TestCompactionTxnHandler { assertEquals(0, txnHandler.findReadyToClean(0, 0).size()); CompactionInfo ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION)); assertNotNull(ci); - - assertEquals(0, txnHandler.findReadyToClean(0, 0).size()); + + ci.highestWriteId = 41; + txnHandler.updateCompactorState(ci, 0); txnHandler.markCompacted(ci); assertNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION))); @@ -225,8 +227,9 @@ public class TestCompactionTxnHandler { assertEquals(0, txnHandler.findReadyToClean(0, 0).size()); CompactionInfo ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION)); assertNotNull(ci); - - assertEquals(0, txnHandler.findReadyToClean(0, 0).size()); + + ci.highestWriteId = 41; + txnHandler.updateCompactorState(ci, 0); txnHandler.markCompacted(ci); assertNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION))); @@ -721,8 +724,9 @@ public class TestCompactionTxnHandler { public void testMarkCleanedCleansTxnsAndTxnComponents() throws Exception { long txnid = openTxn(); - LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, - "mydb"); + long mytableWriteId = allocateTableWriteIds("mydb", "mytable", txnid); + + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setOperationType(DataOperationType.INSERT); List<LockComponent> components = new ArrayList<LockComponent>(1); @@ -746,6 +750,8 @@ public class TestCompactionTxnHandler { txnHandler.abortTxn(new AbortTxnRequest(txnid)); txnid = openTxn(); + long fooWriteId = allocateTableWriteIds("mydb", "foo", txnid); + comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); comp.setTablename("foo"); comp.setPartitionname("bar=compact"); @@ -769,7 +775,7 @@ public class TestCompactionTxnHandler { assertTrue(res.getState() == LockState.ACQUIRED); txnHandler.abortTxn(new AbortTxnRequest(txnid)); - CompactionInfo ci = new CompactionInfo(); + CompactionInfo ci; // Now clean them and check that they are removed from the count. CompactionRequest rqst = new CompactionRequest("mydb", "mytable", CompactionType.MAJOR); @@ -777,8 +783,11 @@ public class TestCompactionTxnHandler { assertEquals(0, txnHandler.findReadyToClean(0, 0).size()); ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION)); assertNotNull(ci); + + ci.highestWriteId = mytableWriteId; + txnHandler.updateCompactorState(ci, 0); txnHandler.markCompacted(ci); - + Thread.sleep(txnHandler.getOpenTxnTimeOutMillis()); List<CompactionInfo> toClean = txnHandler.findReadyToClean(0, 0); assertEquals(1, toClean.size()); @@ -801,6 +810,9 @@ public class TestCompactionTxnHandler { assertEquals(0, txnHandler.findReadyToClean(0, 0).size()); ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION)); assertNotNull(ci); + + ci.highestWriteId = fooWriteId; + txnHandler.updateCompactorState(ci, 0); txnHandler.markCompacted(ci); toClean = txnHandler.findReadyToClean(0, 0); @@ -944,5 +956,12 @@ public class TestCompactionTxnHandler { List<Long> txns = txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost")).getTxn_ids(); return txns.get(0); } + + private long allocateTableWriteIds (String dbName, String tblName, long txnid) throws Exception { + AllocateTableWriteIdsRequest rqst = new AllocateTableWriteIdsRequest(dbName, tblName); + rqst.setTxnIds(Collections.singletonList(txnid)); + AllocateTableWriteIdsResponse writeIds = txnHandler.allocateTableWriteIds(rqst); + return writeIds.getTxnToWriteIds().get(0).getWriteId(); + } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index e82abdd..4a8bbc0 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -806,6 +806,7 @@ public class TestTxnCommands2 extends TxnCommandsBaseForTests { // 2 original files, 2 delta directories, 1 delete_delta directory and 2 base directories Assert.assertEquals(7, status.length); runCleaner(hiveConf); + runCleaner(hiveConf); // There should be only 1 directory left: base_0000001. // Original bucket files, delta directories and previous base directory should have been cleaned up. status = fs.listStatus(new Path(getWarehouseDir() + "/" + @@ -2047,6 +2048,7 @@ public class TestTxnCommands2 extends TxnCommandsBaseForTests { // 5. Run Cleaner. It should remove the 2 delta dirs and 1 old base dir. runCleaner(hiveConf); + runCleaner(hiveConf); // There should be only 1 directory left: base_xxxxxxx. // The delta dirs should have been cleaned up. status = fs.listStatus(new Path(getWarehouseDir() + "/" + diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java index ec9f5a0..8345832 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java @@ -389,6 +389,7 @@ public class TestTxnCommands3 extends TxnCommandsBaseForTests { so cleaner removes all files shadowed by it (which is everything in this case) */ runCleaner(hiveConf); + runCleaner(hiveConf); expectedList = new String[] { "/t/delta_0000001_0000003_v0000020" diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java index a1205f4..42c5a04 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java @@ -591,6 +591,7 @@ public class TestCleaner extends CompactorTest { // unblock the cleaner and run again txnHandler.commitTxn(new CommitTxnRequest(blockingTxn)); startCleaner(); + startCleaner(); // make sure cleaner removed everything below base_24, and both compactions are successful paths = getDirectories(conf, t, p); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index f9d5a7e..4a9a6ef 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -323,57 +323,62 @@ class CompactionTxnHandler extends TxnHandler { @Override @RetrySemantics.ReadOnly public List<CompactionInfo> findReadyToClean(long minOpenTxnWaterMark, long retentionTime) throws MetaException { - Connection dbConn = null; - List<CompactionInfo> rc = new ArrayList<>(); - - Statement stmt = null; - ResultSet rs = null; try { - try { - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); + List<CompactionInfo> rc = new ArrayList<>(); + + try (Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + Statement stmt = dbConn.createStatement()) { /* * By filtering on minOpenTxnWaterMark, we will only cleanup after every transaction is committed, that could see * the uncompacted deltas. This way the cleaner can clean up everything that was made obsolete by this compaction. */ - String s = "SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", " - + "\"CQ_TYPE\", \"CQ_RUN_AS\", \"CQ_HIGHEST_WRITE_ID\" FROM \"COMPACTION_QUEUE\" WHERE \"CQ_STATE\" = '" - + READY_FOR_CLEANING + "'"; + String whereClause = " WHERE \"CQ_STATE\" = '" + READY_FOR_CLEANING + "'"; if (minOpenTxnWaterMark > 0) { - s = s + " AND (\"CQ_NEXT_TXN_ID\" <= " + minOpenTxnWaterMark + " OR \"CQ_NEXT_TXN_ID\" IS NULL)"; + whereClause += " AND (\"CQ_NEXT_TXN_ID\" <= " + minOpenTxnWaterMark + " OR \"CQ_NEXT_TXN_ID\" IS NULL)"; } if (retentionTime > 0) { - s = s + " AND \"CQ_COMMIT_TIME\" < (" + getEpochFn(dbProduct) + " - " + retentionTime + ")"; + whereClause += " AND \"CQ_COMMIT_TIME\" < (" + getEpochFn(dbProduct) + " - " + retentionTime + ")"; } - s = s + " ORDER BY \"CQ_HIGHEST_WRITE_ID\", \"CQ_ID\""; + String s = "SELECT \"CQ_ID\", \"cq1\".\"CQ_DATABASE\", \"cq1\".\"CQ_TABLE\", \"cq1\".\"CQ_PARTITION\"," + + " \"CQ_TYPE\", \"CQ_RUN_AS\", \"CQ_HIGHEST_WRITE_ID\", \"CQ_TBLPROPERTIES\"" + + " FROM \"COMPACTION_QUEUE\" \"cq1\" " + + "INNER JOIN (" + + " SELECT MIN(\"CQ_HIGHEST_WRITE_ID\") \"WRITE_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\"" + + " FROM \"COMPACTION_QUEUE\"" + + whereClause + + " GROUP BY \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\") \"cq2\" " + + "ON \"cq1\".\"CQ_DATABASE\" = \"cq2\".\"CQ_DATABASE\""+ + " AND \"cq1\".\"CQ_TABLE\" = \"cq2\".\"CQ_TABLE\""+ + " AND (\"cq1\".\"CQ_PARTITION\" = \"cq2\".\"CQ_PARTITION\"" + + " OR \"cq1\".\"CQ_PARTITION\" IS NULL AND \"cq2\".\"CQ_PARTITION\" IS NULL)" + + whereClause + + " AND \"CQ_HIGHEST_WRITE_ID\" = \"WRITE_ID\"" + + " ORDER BY \"CQ_ID\""; LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - while (rs.next()) { - CompactionInfo info = new CompactionInfo(); - info.id = rs.getLong(1); - info.dbname = rs.getString(2); - info.tableName = rs.getString(3); - info.partName = rs.getString(4); - info.type = dbCompactionType2ThriftType(rs.getString(5).charAt(0)); - info.runAs = rs.getString(6); - info.highestWriteId = rs.getLong(7); - if (LOG.isDebugEnabled()) { - LOG.debug("Found ready to clean: " + info.toString()); + try (ResultSet rs = stmt.executeQuery(s)) { + while (rs.next()) { + CompactionInfo info = new CompactionInfo(); + info.id = rs.getLong(1); + info.dbname = rs.getString(2); + info.tableName = rs.getString(3); + info.partName = rs.getString(4); + info.type = dbCompactionType2ThriftType(rs.getString(5).charAt(0)); + info.runAs = rs.getString(6); + info.highestWriteId = rs.getLong(7); + if (LOG.isDebugEnabled()) { + LOG.debug("Found ready to clean: " + info.toString()); + } + rc.add(info); } - rc.add(info); } return rc; } catch (SQLException e) { LOG.error("Unable to select next element for cleaning, " + e.getMessage()); - LOG.debug("Going to rollback"); - rollbackDBConn(dbConn); checkRetryable(e, "findReadyToClean"); throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); - } finally { - close(rs, stmt, dbConn); - } + } } catch (RetryException e) { return findReadyToClean(minOpenTxnWaterMark, retentionTime); }