This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch 2.1-tmp
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/2.1-tmp by this push:
     new 77ad3f6a193 [feature](hive)Get updated information from coordinate and 
commit (#32441) (#33466)
77ad3f6a193 is described below

commit 77ad3f6a1933ec8c26e28176399c9131baec2df4
Author: Mingyu Chen <[email protected]>
AuthorDate: Wed Apr 10 15:07:18 2024 +0800

    [feature](hive)Get updated information from coordinate and commit (#32441) 
(#33466)
    
    issue: #31442
    1. Get updated information from coordinate and commit
    2. refresh table after commit
    
    Co-authored-by: wuwenchi <[email protected]>
---
 .../doris/datasource/hive/HiveMetadataOps.java     |  9 ++++++++
 .../plans/commands/insert/HiveInsertExecutor.java  | 10 +++++++++
 .../main/java/org/apache/doris/qe/Coordinator.java | 26 ++++++++++++++++++++++
 3 files changed, 45 insertions(+)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
index 091cd51b232..9279c48fbaa 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
@@ -186,6 +186,15 @@ public class HiveMetadataOps implements 
ExternalMetadataOps {
         Table table = client.getTable(dbName, tableName);
         HMSCommitter hmsCommitter = new HMSCommitter(this, fs, table);
         hmsCommitter.commit(hivePUs);
+        try {
+            Env.getCurrentEnv().getCatalogMgr().refreshExternalTable(
+                    dbName,
+                    tableName,
+                    catalog.getName(),
+                    true);
+        } catch (DdlException e) {
+            LOG.warn("Failed to refresh table {}.{} : {}", dbName, tableName, 
e.getMessage());
+        }
     }
 
     public void updateTableStatistics(
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
index fc6b7a776b0..b054a8b5878 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
@@ -20,7 +20,10 @@ package org.apache.doris.nereids.trees.plans.commands.insert;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.datasource.hive.HMSExternalTable;
+import org.apache.doris.datasource.hive.HiveMetadataOps;
+import org.apache.doris.datasource.operations.ExternalMetadataOps;
 import org.apache.doris.nereids.NereidsPlanner;
 import org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.nereids.trees.plans.Plan;
@@ -32,12 +35,14 @@ import org.apache.doris.planner.PlanFragment;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.QueryState;
 import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.thrift.THivePartitionUpdate;
 import org.apache.doris.transaction.TransactionStatus;
 
 import com.google.common.base.Strings;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.List;
 import java.util.Optional;
 
 /**
@@ -88,6 +93,11 @@ public class HiveInsertExecutor extends 
AbstractInsertExecutor {
         if (ctx.getState().getStateType() == QueryState.MysqlStateType.ERR) {
             LOG.warn("errors when abort txn. {}", ctx.getQueryIdentifier());
         } else {
+            // TODO use transaction
+            List<THivePartitionUpdate> ups = 
coordinator.getHivePartitionUpdates();
+            ExternalCatalog catalog = ((HMSExternalTable) table).getCatalog();
+            ExternalMetadataOps metadataOps = catalog.getMetadataOps();
+            ((HiveMetadataOps) metadataOps).commit(((HMSExternalTable) 
table).getDbName(), table.getName(), ups);
             txnStatus = TransactionStatus.COMMITTED;
         }
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 44e0f4df16d..1b99da152f9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -88,6 +88,7 @@ import org.apache.doris.thrift.TExecPlanFragmentParamsList;
 import org.apache.doris.thrift.TExternalScanRange;
 import org.apache.doris.thrift.TFileScanRange;
 import org.apache.doris.thrift.TFileScanRangeParams;
+import org.apache.doris.thrift.THivePartitionUpdate;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TPaloScanRange;
 import org.apache.doris.thrift.TPipelineFragmentParams;
@@ -230,6 +231,9 @@ public class Coordinator implements CoordInterface {
     private final List<TTabletCommitInfo> commitInfos = Lists.newArrayList();
     private final List<TErrorTabletInfo> errorTabletInfos = 
Lists.newArrayList();
 
+    // TODO moved to ExternalTransactionManager
+    private final List<THivePartitionUpdate> hivePartitionUpdates = 
Lists.newArrayList();
+
     // Input parameter
     private long jobId = -1; // job which this task belongs to
     private TUniqueId queryId;
@@ -486,6 +490,10 @@ public class Coordinator implements CoordInterface {
         return errorTabletInfos;
     }
 
+    public List<THivePartitionUpdate> getHivePartitionUpdates() {
+        return hivePartitionUpdates;
+    }
+
     public Map<String, Integer> getBeToInstancesNum() {
         Map<String, Integer> result = Maps.newTreeMap();
         if (enablePipelineEngine) {
@@ -2416,6 +2424,15 @@ public class Coordinator implements CoordInterface {
         // TODO: more ranges?
     }
 
+    private void updateHivePartitionUpdates(List<THivePartitionUpdate> 
hivePartitionUpdates) {
+        lock.lock();
+        try {
+            this.hivePartitionUpdates.addAll(hivePartitionUpdates);
+        } finally {
+            lock.unlock();
+        }
+    }
+
     // update job progress from BE
     public void updateFragmentExecStatus(TReportExecStatusParams params) {
         if (enablePipelineXEngine) {
@@ -2464,6 +2481,9 @@ public class Coordinator implements CoordInterface {
             if (params.isSetErrorTabletInfos()) {
                 updateErrorTabletInfos(params.getErrorTabletInfos());
             }
+            if (params.isSetHivePartitionUpdates()) {
+                updateHivePartitionUpdates(params.getHivePartitionUpdates());
+            }
 
             Preconditions.checkArgument(params.isSetDetailedReport());
             if (ctx.done) {
@@ -2529,6 +2549,9 @@ public class Coordinator implements CoordInterface {
                 if (params.isSetErrorTabletInfos()) {
                     updateErrorTabletInfos(params.getErrorTabletInfos());
                 }
+                if (params.isSetHivePartitionUpdates()) {
+                    
updateHivePartitionUpdates(params.getHivePartitionUpdates());
+                }
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Query {} instance {} is marked done",
                             DebugUtil.printId(queryId), 
DebugUtil.printId(params.getFragmentInstanceId()));
@@ -2606,6 +2629,9 @@ public class Coordinator implements CoordInterface {
                 if (params.isSetErrorTabletInfos()) {
                     updateErrorTabletInfos(params.getErrorTabletInfos());
                 }
+                if (params.isSetHivePartitionUpdates()) {
+                    
updateHivePartitionUpdates(params.getHivePartitionUpdates());
+                }
                 
executionProfile.markOneInstanceDone(params.getFragmentInstanceId());
             }
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to