This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 0e119ea HIVE-25781: Restore multi-threaded support in Cleaner after
HIVE-25115 (Denys Kuzmenko, reviewed by Karen Coppage)
0e119ea is described below
commit 0e119eaddb93dc10743bb8990ce8eca4fb77cf16
Author: Denys Kuzmenko
AuthorDate: Wed Dec 8 10:59:48 2021 +0200
HIVE-25781: Restore multi-threaded support in Cleaner after HIVE-25115
(Denys Kuzmenko, reviewed by Karen Coppage)
Closes #2825
---
.../hive/ql/txn/compactor/TestCompactor.java | 4 ++
.../metastore/txn/TestCompactionTxnHandler.java| 35 ---
.../apache/hadoop/hive/ql/TestTxnCommands2.java| 2 +
.../apache/hadoop/hive/ql/TestTxnCommands3.java| 1 +
.../hadoop/hive/ql/txn/compactor/TestCleaner.java | 1 +
.../hive/metastore/txn/CompactionTxnHandler.java | 69 --
6 files changed, 72 insertions(+), 40 deletions(-)
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index 7e48419..13705be 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -1621,6 +1621,10 @@ public class TestCompactor {
verifyFooBarResult(tblName, 2);
verifyHasBase(table.getSd(), fs, "base_005_v016");
runCleaner(conf);
+// in case when we have # of accumulated entries for the same
table/partition - we need to process them one-by-one in ASC order of write_id's,
+// however, to support multi-threaded processing in the Cleaner, we have
to move entries from the same group to the next Cleaner cycle,
+// so that they are not processed by multiple threads concurrently.
+runCleaner(conf);
verifyDeltaCount(table.getSd(), fs, 0);
}
diff --git
a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
index ea1abc6..9bfc324 100644
---
a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
+++
b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
@@ -51,6 +51,7 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.SortedSet;
@@ -196,8 +197,9 @@ public class TestCompactionTxnHandler {
assertEquals(0, txnHandler.findReadyToClean(0, 0).size());
CompactionInfo ci =
txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION));
assertNotNull(ci);
-
-assertEquals(0, txnHandler.findReadyToClean(0, 0).size());
+
+ci.highestWriteId = 41;
+txnHandler.updateCompactorState(ci, 0);
txnHandler.markCompacted(ci);
assertNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred",
WORKER_VERSION)));
@@ -225,8 +227,9 @@ public class TestCompactionTxnHandler {
assertEquals(0, txnHandler.findReadyToClean(0, 0).size());
CompactionInfo ci =
txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION));
assertNotNull(ci);
-
-assertEquals(0, txnHandler.findReadyToClean(0, 0).size());
+
+ci.highestWriteId = 41;
+txnHandler.updateCompactorState(ci, 0);
txnHandler.markCompacted(ci);
assertNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred",
WORKER_VERSION)));
@@ -721,8 +724,9 @@ public class TestCompactionTxnHandler {
public void testMarkCleanedCleansTxnsAndTxnComponents()
throws Exception {
long txnid = openTxn();
-LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB,
-"mydb");
+long mytableWriteId = allocateTableWriteIds("mydb", "mytable", txnid);
+
+LockComponent comp = new LockComponent(LockType.SHARED_WRITE,
LockLevel.DB, "mydb");
comp.setTablename("mytable");
comp.setOperationType(DataOperationType.INSERT);
List components = new ArrayList(1);
@@ -746,6 +750,8 @@ public class TestCompactionTxnHandler {
txnHandler.abortTxn(new AbortTxnRequest(txnid));
txnid = openTxn();
+long fooWriteId = allocateTableWriteIds("mydb", "foo", txnid);
+
comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
comp.setTablename("foo");
comp.setPartitionname("bar=compact");
@@ -769,7 +775,7 @@ public class TestCompactionTxnHandler {
assertTrue(res.getState() == LockState.ACQUIRED);
txnHandler.abortTxn(new AbortTxnRequest(txnid));
-CompactionInfo ci = new CompactionInfo();
+CompactionInfo ci;
// Now clean them