This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new fb310423274 branch-2.1: [fix](hive)Clear processed tasks #45309
(#45339)
fb310423274 is described below
commit fb3104232744434f30d3fe92cf00d6d92f27e962
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Dec 17 22:00:57 2024 +0800
branch-2.1: [fix](hive)Clear processed tasks #45309 (#45339)
Cherry-picked from #45309
Co-authored-by: wuwenchi <[email protected]>
---
.../doris/datasource/hive/HMSTransaction.java | 12 +++++++
.../doris/datasource/hive/HmsCommitTest.java | 40 +++++++++++++++++++++-
2 files changed, 51 insertions(+), 1 deletion(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
index 8041904723a..e20604c76e5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
@@ -527,6 +527,11 @@ public class HMSTransaction implements Transaction {
return partitions;
}
+ public void clear() {
+ partitions.clear();
+ createdPartitionValues.clear();
+ }
+
public void addPartition(HivePartitionWithStatistics partition) {
partitions.add(partition);
}
@@ -1136,6 +1141,7 @@ public class HMSTransaction implements Transaction {
for (CompletableFuture<?> undoUpdateFuture :
undoUpdateFutures.build()) {
MoreFutures.getFutureValue(undoUpdateFuture);
}
+ updateStatisticsTasks.clear();
}
private void undoAddPartitionsTask() {
@@ -1150,6 +1156,7 @@ public class HMSTransaction implements Transaction {
LOG.warn("Failed to rollback: add_partition for partition
values {}.{}",
tableInfo, rollbackFailedPartitions);
}
+ addPartitionsTask.clear();
}
private void waitForAsyncFileSystemTaskSuppressThrowable() {
@@ -1162,6 +1169,7 @@ public class HMSTransaction implements Transaction {
// ignore
}
}
+ asyncFileSystemTaskFutures.clear();
}
public void prepareInsertExistingTable(SimpleTableInfo tableInfo,
TableAndMore tableAndMore) {
@@ -1312,6 +1320,7 @@ public class HMSTransaction implements Transaction {
for (DirectoryCleanUpTask cleanUpTask :
directoryCleanUpTasksForAbort) {
recursiveDeleteItems(cleanUpTask.getPath(),
cleanUpTask.isDeleteEmptyDir(), false);
}
+ directoryCleanUpTasksForAbort.clear();
}
private void runRenameDirTasksForAbort() {
@@ -1327,6 +1336,7 @@ public class HMSTransaction implements Transaction {
}
}
}
+ renameDirectoryTasksForAbort.clear();
}
private void runClearPathsForFinish() {
@@ -1479,6 +1489,7 @@ public class HMSTransaction implements Transaction {
.build());
}, fileSystemExecutor));
}
+ uncompletedMpuPendingUploads.clear();
}
public void doNothing() {
@@ -1513,6 +1524,7 @@ public class HMSTransaction implements Transaction {
for (CompletableFuture<?> future : asyncFileSystemTaskFutures) {
MoreFutures.getFutureValue(future, RuntimeException.class);
}
+ asyncFileSystemTaskFutures.clear();
}
public void shutdownExecutorService() {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java
index 27464963ce1..3e296053507 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java
@@ -675,5 +675,43 @@ public class HmsCommitTest {
Partition pa = hmsClient.getPartition(dbName, tbWithPartition,
Lists.newArrayList("a"));
assertNumRows(3, pa);
}
-}
+ @Test
+ public void testCommitWithRollback() {
+ genQueryID();
+ List<THivePartitionUpdate> pus = new ArrayList<>();
+ try {
+ pus.add(createRandomAppend(null));
+ pus.add(createRandomAppend(null));
+ pus.add(createRandomAppend(null));
+ } catch (Throwable t) {
+ Assert.fail();
+ }
+
+ mockDoOther(() -> {
+ Table table = hmsClient.getTable(dbName, tbWithoutPartition);
+ assertNumRows(3, table);
+ });
+
+ HMSTransaction hmsTransaction = new HMSTransaction(hmsOps,
fileSystemProvider, fileSystemExecutor);
+ try {
+ hmsTransaction.setHivePartitionUpdates(pus);
+ HiveInsertCommandContext ctx = new HiveInsertCommandContext();
+ String queryId = DebugUtil.printId(ConnectContext.get().queryId());
+ ctx.setQueryId(queryId);
+ ctx.setWritePath(getWritePath());
+ hmsTransaction.beginInsertTable(ctx);
+ hmsTransaction.finishInsertTable(new SimpleTableInfo(dbName,
tbWithoutPartition));
+ hmsTransaction.commit();
+ Assert.fail();
+ } catch (Throwable t) {
+ Assert.assertTrue(t.getMessage().contains("failed to do nothing"));
+ }
+
+ try {
+ hmsTransaction.rollback();
+ } catch (Throwable t) {
+ Assert.fail();
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]