This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1a02c571a9 [Improve] StarRocksSourceReader  use the existing client  
(#6480)
1a02c571a9 is described below

commit 1a02c571a98d02d95895e598b958b31afb832439
Author: xiaochen <[email protected]>
AuthorDate: Tue Mar 12 11:08:09 2024 +0800

    [Improve] StarRocksSourceReader  use the existing client  (#6480)
---
 .../client/source/StarRocksBeReadClient.java       | 25 +++++--------
 .../starrocks/source/StarRocksSourceReader.java    | 42 +++++++++++++++++-----
 2 files changed, 41 insertions(+), 26 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksBeReadClient.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksBeReadClient.java
index fdd240b4c5..07a5a03eba 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksBeReadClient.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksBeReadClient.java
@@ -40,7 +40,7 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.List;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorErrorCode.CLOSE_BE_READER_FAILED;
@@ -55,21 +55,12 @@ public class StarRocksBeReadClient implements Serializable {
     private String contextId;
     private int readerOffset = 0;
     private final SourceConfig sourceConfig;
-    private final SeaTunnelRowType seaTunnelRowType;
+    private SeaTunnelRowType seaTunnelRowType;
     private StarRocksRowBatchReader rowBatch;
-
-    private final List<Long> tabletIds;
-
-    private final String queryPlan;
     protected AtomicBoolean eos = new AtomicBoolean(false);
 
-    public StarRocksBeReadClient(
-            QueryPartition queryPartition,
-            SourceConfig sourceConfig,
-            SeaTunnelRowType seaTunnelRowType) {
+    public StarRocksBeReadClient(String beNodeInfo, SourceConfig sourceConfig) 
{
         this.sourceConfig = sourceConfig;
-        this.seaTunnelRowType = seaTunnelRowType;
-        String beNodeInfo = queryPartition.getBeAddress();
         log.debug("Parse StarRocks BE address: '{}'.", beNodeInfo);
         String[] hostPort = beNodeInfo.split(":");
         if (hostPort.length != 2) {
@@ -79,8 +70,6 @@ public class StarRocksBeReadClient implements Serializable {
         }
         this.ip = hostPort[0].trim();
         this.port = Integer.parseInt(hostPort[1].trim());
-        this.queryPlan = queryPartition.getQueryPlan();
-        this.tabletIds = new ArrayList<>(queryPartition.getTabletIds());
         TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory();
         TSocket socket =
                 new TSocket(
@@ -101,10 +90,12 @@ public class StarRocksBeReadClient implements Serializable 
{
         client = new TStarrocksExternalService.Client(protocol);
     }
 
-    public void openScanner() {
+    public void openScanner(QueryPartition partition, SeaTunnelRowType 
seaTunnelRowType) {
+        this.seaTunnelRowType = seaTunnelRowType;
+        Set<Long> tabletIds = partition.getTabletIds();
         TScanOpenParams params = new TScanOpenParams();
-        params.setTablet_ids(tabletIds);
-        params.setOpaqued_query_plan(queryPlan);
+        params.setTablet_ids(new ArrayList<>(tabletIds));
+        params.setOpaqued_query_plan(partition.getQueryPlan());
         params.setCluster(DEFAULT_CLUSTER_NAME);
         params.setDatabase(sourceConfig.getDatabase());
         params.setTable(sourceConfig.getTable());
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSourceReader.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSourceReader.java
index 9ccd02b554..7f68d4e321 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSourceReader.java
@@ -23,14 +23,18 @@ import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.starrocks.client.source.StarRocksBeReadClient;
+import 
org.apache.seatunnel.connectors.seatunnel.starrocks.client.source.model.QueryPartition;
 import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
 
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Queue;
 
 @Slf4j
@@ -40,6 +44,7 @@ public class StarRocksSourceReader implements 
SourceReader<SeaTunnelRow, StarRoc
     private final SourceReader.Context context;
     private final SourceConfig sourceConfig;
     private final SeaTunnelRowType seaTunnelRowType;
+    private Map<String, StarRocksBeReadClient> clientsPools;
     private volatile boolean noMoreSplitsAssignment;
 
     public StarRocksSourceReader(
@@ -87,26 +92,45 @@ public class StarRocksSourceReader implements 
SourceReader<SeaTunnelRow, StarRoc
     }
 
     private void read(StarRocksSourceSplit split, Collector<SeaTunnelRow> 
output) {
-        StarRocksBeReadClient client =
-                new StarRocksBeReadClient(split.getPartition(), sourceConfig, 
seaTunnelRowType);
+
+        QueryPartition partition = split.getPartition();
+        String beAddress = partition.getBeAddress();
+        StarRocksBeReadClient client = null;
+        if (clientsPools.containsKey(beAddress)) {
+            client = clientsPools.get(beAddress);
+        } else {
+            client = new StarRocksBeReadClient(beAddress, sourceConfig);
+            clientsPools.put(beAddress, client);
+        }
         // open scanner to be
-        client.openScanner();
+        client.openScanner(partition, seaTunnelRowType);
         while (client.hasNext()) {
             SeaTunnelRow seaTunnelRow = client.getNext();
             output.collect(seaTunnelRow);
         }
-        // close client to be
-        if (client != null) {
-            client.close();
-        }
     }
 
     @Override
-    public void open() throws Exception {}
+    public void open() throws Exception {
+        clientsPools = new HashMap<>();
+    }
 
     @Override
     public void close() throws IOException {
-        // nothing to do
+        if (!clientsPools.isEmpty()) {
+            clientsPools
+                    .values()
+                    .forEach(
+                            client -> {
+                                if (client != null) {
+                                    try {
+                                        client.close();
+                                    } catch (StarRocksConnectorException e) {
+                                        log.error("Failed to close reader: ", 
e);
+                                    }
+                                }
+                            });
+        }
     }
 
     @Override

Reply via email to