This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch dev_syxj_2
in repository https://gitbox.apache.org/repos/asf/doris.git

commit bdb818698d08a180109150e93b07e7114f4e1b05
Author: HappenLee <[email protected]>
AuthorDate: Thu Aug 10 18:34:45 2023 +0800

    [pipeline](exec) Support shared scan in jdbc and odbc scan node (#22826)
    
    Support shared scan in jdbc and odbc scan node to improve exec performance
---
 be/src/exec/exec_node.cpp                                         | 2 ++
 be/src/pipeline/pipeline_fragment_context.cpp                     | 1 +
 .../main/java/org/apache/doris/planner/DistributedPlanner.java    | 8 +-------
 .../java/org/apache/doris/planner/external/jdbc/JdbcScanNode.java | 4 +++-
 .../java/org/apache/doris/planner/external/odbc/OdbcScanNode.java | 7 +++++++
 5 files changed, 14 insertions(+), 8 deletions(-)

diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index bcb0771eda..9a9c464b99 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -498,6 +498,8 @@ void ExecNode::collect_scan_nodes(vector<ExecNode*>* nodes) 
{
     collect_nodes(TPlanNodeType::DATA_GEN_SCAN_NODE, nodes);
     collect_nodes(TPlanNodeType::FILE_SCAN_NODE, nodes);
     collect_nodes(TPlanNodeType::META_SCAN_NODE, nodes);
+    collect_nodes(TPlanNodeType::JDBC_SCAN_NODE, nodes);
+    collect_nodes(TPlanNodeType::ODBC_SCAN_NODE, nodes);
 }
 
 void ExecNode::init_runtime_profile(const std::string& name) {
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 008d52c088..86e01ef3b5 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -100,6 +100,7 @@
 #include "vec/exec/join/vhash_join_node.h"
 #include "vec/exec/scan/new_es_scan_node.h"
 #include "vec/exec/scan/new_file_scan_node.h"
+#include "vec/exec/scan/new_jdbc_scan_node.h"
 #include "vec/exec/scan/new_odbc_scan_node.h"
 #include "vec/exec/scan/new_olap_scan_node.h"
 #include "vec/exec/scan/vmeta_scan_node.h"
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
index 75ca717050..a719081496 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
@@ -38,8 +38,6 @@ import org.apache.doris.catalog.Table;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
-import org.apache.doris.planner.external.jdbc.JdbcScanNode;
-import org.apache.doris.planner.external.odbc.OdbcScanNode;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.thrift.TPartitionType;
@@ -281,12 +279,8 @@ public class DistributedPlanner {
      * TODO: hbase scans are range-partitioned on the row key
      */
     private PlanFragment createScanFragment(PlanNode node) throws 
UserException {
-        if (node instanceof MysqlScanNode || node instanceof OdbcScanNode || 
node instanceof JdbcScanNode) {
+        if (node instanceof MysqlScanNode) {
             return new PlanFragment(ctx.getNextFragmentId(), node, 
DataPartition.UNPARTITIONED);
-        } else if (node instanceof SchemaScanNode) {
-            return new PlanFragment(ctx.getNextFragmentId(), node, 
DataPartition.RANDOM);
-        } else if (node instanceof DataGenScanNode) {
-            return new PlanFragment(ctx.getNextFragmentId(), node, 
DataPartition.RANDOM);
         } else if (node instanceof OlapScanNode) {
             // olap scan node
             OlapScanNode olapScanNode = (OlapScanNode) node;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/jdbc/JdbcScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/jdbc/JdbcScanNode.java
index 3bcbfb8b4d..4854ce5a00 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/jdbc/JdbcScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/jdbc/JdbcScanNode.java
@@ -38,6 +38,7 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.planner.external.ExternalScanNode;
+import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.statistics.StatsRecursiveDerive;
 import org.apache.doris.statistics.query.StatsDelta;
@@ -272,7 +273,8 @@ public class JdbcScanNode extends ExternalScanNode {
 
     @Override
     public int getNumInstances() {
-        return 1;
+        return 
ConnectContext.get().getSessionVariable().getEnablePipelineEngine()
+            ? 
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum() : 1;
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/odbc/OdbcScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/odbc/OdbcScanNode.java
index 2bac81218f..832922ef81 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/odbc/OdbcScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/odbc/OdbcScanNode.java
@@ -32,6 +32,7 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.planner.external.ExternalScanNode;
 import org.apache.doris.planner.external.jdbc.JdbcScanNode;
+import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.statistics.StatsRecursiveDerive;
 import org.apache.doris.statistics.query.StatsDelta;
@@ -213,4 +214,10 @@ public class OdbcScanNode extends ExternalScanNode {
                 
Env.getCurrentEnv().getCurrentCatalog().getDbOrAnalysisException(tbl.getQualifiedDbName()).getId(),
                 tbl.getId(), -1L);
     }
+
+    @Override
+    public int getNumInstances() {
+        return 
ConnectContext.get().getSessionVariable().getEnablePipelineEngine()
+            ? 
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum() : 1;
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to