This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 1a02c571a9 [Improve] StarRocksSourceReader use the existing client
(#6480)
1a02c571a9 is described below
commit 1a02c571a98d02d95895e598b958b31afb832439
Author: xiaochen <[email protected]>
AuthorDate: Tue Mar 12 11:08:09 2024 +0800
[Improve] StarRocksSourceReader use the existing client (#6480)
---
.../client/source/StarRocksBeReadClient.java | 25 +++++--------
.../starrocks/source/StarRocksSourceReader.java | 42 +++++++++++++++++-----
2 files changed, 41 insertions(+), 26 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksBeReadClient.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksBeReadClient.java
index fdd240b4c5..07a5a03eba 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksBeReadClient.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksBeReadClient.java
@@ -40,7 +40,7 @@ import lombok.extern.slf4j.Slf4j;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.List;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import static
org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorErrorCode.CLOSE_BE_READER_FAILED;
@@ -55,21 +55,12 @@ public class StarRocksBeReadClient implements Serializable {
private String contextId;
private int readerOffset = 0;
private final SourceConfig sourceConfig;
- private final SeaTunnelRowType seaTunnelRowType;
+ private SeaTunnelRowType seaTunnelRowType;
private StarRocksRowBatchReader rowBatch;
-
- private final List<Long> tabletIds;
-
- private final String queryPlan;
protected AtomicBoolean eos = new AtomicBoolean(false);
- public StarRocksBeReadClient(
- QueryPartition queryPartition,
- SourceConfig sourceConfig,
- SeaTunnelRowType seaTunnelRowType) {
+ public StarRocksBeReadClient(String beNodeInfo, SourceConfig sourceConfig)
{
this.sourceConfig = sourceConfig;
- this.seaTunnelRowType = seaTunnelRowType;
- String beNodeInfo = queryPartition.getBeAddress();
log.debug("Parse StarRocks BE address: '{}'.", beNodeInfo);
String[] hostPort = beNodeInfo.split(":");
if (hostPort.length != 2) {
@@ -79,8 +70,6 @@ public class StarRocksBeReadClient implements Serializable {
}
this.ip = hostPort[0].trim();
this.port = Integer.parseInt(hostPort[1].trim());
- this.queryPlan = queryPartition.getQueryPlan();
- this.tabletIds = new ArrayList<>(queryPartition.getTabletIds());
TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory();
TSocket socket =
new TSocket(
@@ -101,10 +90,12 @@ public class StarRocksBeReadClient implements Serializable
{
client = new TStarrocksExternalService.Client(protocol);
}
- public void openScanner() {
+ public void openScanner(QueryPartition partition, SeaTunnelRowType
seaTunnelRowType) {
+ this.seaTunnelRowType = seaTunnelRowType;
+ Set<Long> tabletIds = partition.getTabletIds();
TScanOpenParams params = new TScanOpenParams();
- params.setTablet_ids(tabletIds);
- params.setOpaqued_query_plan(queryPlan);
+ params.setTablet_ids(new ArrayList<>(tabletIds));
+ params.setOpaqued_query_plan(partition.getQueryPlan());
params.setCluster(DEFAULT_CLUSTER_NAME);
params.setDatabase(sourceConfig.getDatabase());
params.setTable(sourceConfig.getTable());
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSourceReader.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSourceReader.java
index 9ccd02b554..7f68d4e321 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSourceReader.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSourceReader.java
@@ -23,14 +23,18 @@ import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.starrocks.client.source.StarRocksBeReadClient;
+import
org.apache.seatunnel.connectors.seatunnel.starrocks.client.source.model.QueryPartition;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Queue;
@Slf4j
@@ -40,6 +44,7 @@ public class StarRocksSourceReader implements
SourceReader<SeaTunnelRow, StarRoc
private final SourceReader.Context context;
private final SourceConfig sourceConfig;
private final SeaTunnelRowType seaTunnelRowType;
+ private Map<String, StarRocksBeReadClient> clientsPools;
private volatile boolean noMoreSplitsAssignment;
public StarRocksSourceReader(
@@ -87,26 +92,45 @@ public class StarRocksSourceReader implements
SourceReader<SeaTunnelRow, StarRoc
}
private void read(StarRocksSourceSplit split, Collector<SeaTunnelRow>
output) {
- StarRocksBeReadClient client =
- new StarRocksBeReadClient(split.getPartition(), sourceConfig,
seaTunnelRowType);
+
+ QueryPartition partition = split.getPartition();
+ String beAddress = partition.getBeAddress();
+ StarRocksBeReadClient client = null;
+ if (clientsPools.containsKey(beAddress)) {
+ client = clientsPools.get(beAddress);
+ } else {
+ client = new StarRocksBeReadClient(beAddress, sourceConfig);
+ clientsPools.put(beAddress, client);
+ }
// open scanner to be
- client.openScanner();
+ client.openScanner(partition, seaTunnelRowType);
while (client.hasNext()) {
SeaTunnelRow seaTunnelRow = client.getNext();
output.collect(seaTunnelRow);
}
- // close client to be
- if (client != null) {
- client.close();
- }
}
@Override
- public void open() throws Exception {}
+ public void open() throws Exception {
+ clientsPools = new HashMap<>();
+ }
@Override
public void close() throws IOException {
- // nothing to do
+ if (!clientsPools.isEmpty()) {
+ clientsPools
+ .values()
+ .forEach(
+ client -> {
+ if (client != null) {
+ try {
+ client.close();
+ } catch (StarRocksConnectorException e) {
+ log.error("Failed to close reader: ",
e);
+ }
+ }
+ });
+ }
}
@Override