This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch 2.1-tmp
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/2.1-tmp by this push:
new 77ad3f6a193 [feature](hive)Get updated information from coordinate and
commit (#32441) (#33466)
77ad3f6a193 is described below
commit 77ad3f6a1933ec8c26e28176399c9131baec2df4
Author: Mingyu Chen <[email protected]>
AuthorDate: Wed Apr 10 15:07:18 2024 +0800
[feature](hive)Get updated information from coordinate and commit (#32441)
(#33466)
issue: #31442
1. Get updated information from coordinate and commit
2. refresh table after commit
Co-authored-by: wuwenchi <[email protected]>
---
.../doris/datasource/hive/HiveMetadataOps.java | 9 ++++++++
.../plans/commands/insert/HiveInsertExecutor.java | 10 +++++++++
.../main/java/org/apache/doris/qe/Coordinator.java | 26 ++++++++++++++++++++++
3 files changed, 45 insertions(+)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
index 091cd51b232..9279c48fbaa 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
@@ -186,6 +186,15 @@ public class HiveMetadataOps implements
ExternalMetadataOps {
Table table = client.getTable(dbName, tableName);
HMSCommitter hmsCommitter = new HMSCommitter(this, fs, table);
hmsCommitter.commit(hivePUs);
+ try {
+ Env.getCurrentEnv().getCatalogMgr().refreshExternalTable(
+ dbName,
+ tableName,
+ catalog.getName(),
+ true);
+ } catch (DdlException e) {
+ LOG.warn("Failed to refresh table {}.{} : {}", dbName, tableName,
e.getMessage());
+ }
}
public void updateTableStatistics(
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
index fc6b7a776b0..b054a8b5878 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
@@ -20,7 +20,10 @@ package org.apache.doris.nereids.trees.plans.commands.insert;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
+import org.apache.doris.datasource.hive.HiveMetadataOps;
+import org.apache.doris.datasource.operations.ExternalMetadataOps;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.plans.Plan;
@@ -32,12 +35,14 @@ import org.apache.doris.planner.PlanFragment;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.thrift.THivePartitionUpdate;
import org.apache.doris.transaction.TransactionStatus;
import com.google.common.base.Strings;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.List;
import java.util.Optional;
/**
@@ -88,6 +93,11 @@ public class HiveInsertExecutor extends
AbstractInsertExecutor {
if (ctx.getState().getStateType() == QueryState.MysqlStateType.ERR) {
LOG.warn("errors when abort txn. {}", ctx.getQueryIdentifier());
} else {
+ // TODO use transaction
+ List<THivePartitionUpdate> ups =
coordinator.getHivePartitionUpdates();
+ ExternalCatalog catalog = ((HMSExternalTable) table).getCatalog();
+ ExternalMetadataOps metadataOps = catalog.getMetadataOps();
+ ((HiveMetadataOps) metadataOps).commit(((HMSExternalTable)
table).getDbName(), table.getName(), ups);
txnStatus = TransactionStatus.COMMITTED;
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 44e0f4df16d..1b99da152f9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -88,6 +88,7 @@ import org.apache.doris.thrift.TExecPlanFragmentParamsList;
import org.apache.doris.thrift.TExternalScanRange;
import org.apache.doris.thrift.TFileScanRange;
import org.apache.doris.thrift.TFileScanRangeParams;
+import org.apache.doris.thrift.THivePartitionUpdate;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPaloScanRange;
import org.apache.doris.thrift.TPipelineFragmentParams;
@@ -230,6 +231,9 @@ public class Coordinator implements CoordInterface {
private final List<TTabletCommitInfo> commitInfos = Lists.newArrayList();
private final List<TErrorTabletInfo> errorTabletInfos =
Lists.newArrayList();
+ // TODO moved to ExternalTransactionManager
+ private final List<THivePartitionUpdate> hivePartitionUpdates =
Lists.newArrayList();
+
// Input parameter
private long jobId = -1; // job which this task belongs to
private TUniqueId queryId;
@@ -486,6 +490,10 @@ public class Coordinator implements CoordInterface {
return errorTabletInfos;
}
+ public List<THivePartitionUpdate> getHivePartitionUpdates() {
+ return hivePartitionUpdates;
+ }
+
public Map<String, Integer> getBeToInstancesNum() {
Map<String, Integer> result = Maps.newTreeMap();
if (enablePipelineEngine) {
@@ -2416,6 +2424,15 @@ public class Coordinator implements CoordInterface {
// TODO: more ranges?
}
+ private void updateHivePartitionUpdates(List<THivePartitionUpdate>
hivePartitionUpdates) {
+ lock.lock();
+ try {
+ this.hivePartitionUpdates.addAll(hivePartitionUpdates);
+ } finally {
+ lock.unlock();
+ }
+ }
+
// update job progress from BE
public void updateFragmentExecStatus(TReportExecStatusParams params) {
if (enablePipelineXEngine) {
@@ -2464,6 +2481,9 @@ public class Coordinator implements CoordInterface {
if (params.isSetErrorTabletInfos()) {
updateErrorTabletInfos(params.getErrorTabletInfos());
}
+ if (params.isSetHivePartitionUpdates()) {
+ updateHivePartitionUpdates(params.getHivePartitionUpdates());
+ }
Preconditions.checkArgument(params.isSetDetailedReport());
if (ctx.done) {
@@ -2529,6 +2549,9 @@ public class Coordinator implements CoordInterface {
if (params.isSetErrorTabletInfos()) {
updateErrorTabletInfos(params.getErrorTabletInfos());
}
+ if (params.isSetHivePartitionUpdates()) {
+
updateHivePartitionUpdates(params.getHivePartitionUpdates());
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("Query {} instance {} is marked done",
DebugUtil.printId(queryId),
DebugUtil.printId(params.getFragmentInstanceId()));
@@ -2606,6 +2629,9 @@ public class Coordinator implements CoordInterface {
if (params.isSetErrorTabletInfos()) {
updateErrorTabletInfos(params.getErrorTabletInfos());
}
+ if (params.isSetHivePartitionUpdates()) {
+
updateHivePartitionUpdates(params.getHivePartitionUpdates());
+ }
executionProfile.markOneInstanceDone(params.getFragmentInstanceId());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]