This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new aa0b2119a7 [Improve] Add disable 2pc in SelectDB cloud sink (#6266)
aa0b2119a7 is described below
commit aa0b2119a735effd33edba1559f0c2a755b5b2f7
Author: Jia Fan <[email protected]>
AuthorDate: Fri Feb 23 18:50:30 2024 +0800
[Improve] Add disable 2pc in SelectDB cloud sink (#6266)
---
docs/en/connector-v2/sink/SelectDB-Cloud.md | 34 +++---
.../connectors/selectdb/config/SelectDBConfig.java | 11 ++
.../CopySQLUtil.java} | 61 +++--------
.../selectdb/sink/committer/SelectDBCommitter.java | 119 +--------------------
.../selectdb/sink/writer/SelectDBSinkWriter.java | 15 ++-
.../selectdb/sink/writer/SelectDBStageLoad.java | 27 ++---
.../connectors/selectdb/util/HttpUtil.java | 2 +-
7 files changed, 67 insertions(+), 202 deletions(-)
diff --git a/docs/en/connector-v2/sink/SelectDB-Cloud.md
b/docs/en/connector-v2/sink/SelectDB-Cloud.md
index 6ad2997903..41ca0ddaf2 100644
--- a/docs/en/connector-v2/sink/SelectDB-Cloud.md
+++ b/docs/en/connector-v2/sink/SelectDB-Cloud.md
@@ -30,19 +30,20 @@ Version Supported
## Sink Options
-| Name | Type | Required | Default |
Description
|
-|--------------------|--------|----------|------------------------|-------------------------------------------------------------------------------------------------------------------------------------------|
-| load-url | String | Yes | - | `SelectDB
Cloud` warehouse http address, the format is `warehouse_ip:http_port`
|
-| jdbc-url | String | Yes | - | `SelectDB
Cloud` warehouse jdbc address, the format is `warehouse_ip:mysql_port`
|
-| cluster-name | String | Yes | - | `SelectDB
Cloud` cluster name
|
-| username | String | Yes | - | `SelectDB
Cloud` user username
|
-| password | String | Yes | - | `SelectDB
Cloud` user password
|
-| table.identifier | String | Yes | - | The name
of `SelectDB Cloud` table, the format is `database.table`
|
-| sink.enable-delete | bool | No | false | Whether to
enable deletion. This option requires SelectDB Cloud table to enable batch
delete function, and only supports Unique model. |
-| sink.max-retries | int | No | 3 | the max
retry times if writing records to database failed
|
-| sink.buffer-size | int | No | 10 * 1024 * 1024 (1MB) | the buffer
size to cache data for stream load.
|
-| sink.buffer-count | int | No | 10000 | the buffer
count to cache data for stream load.
|
-| selectdb.config | map | yes | - | This
option is used to support operations such as `insert`, `delete`, and `update`
when automatically generate sql,and supported formats. |
+| Name | Type | Required | Default |
Description
|
+|--------------------|--------|----------|------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| load-url | String | Yes | - | `SelectDB
Cloud` warehouse http address, the format is `warehouse_ip:http_port`
|
+| jdbc-url | String | Yes | - | `SelectDB
Cloud` warehouse jdbc address, the format is `warehouse_ip:mysql_port`
|
+| cluster-name | String | Yes | - | `SelectDB
Cloud` cluster name
|
+| username | String | Yes | - | `SelectDB
Cloud` user username
|
+| password | String | Yes | - | `SelectDB
Cloud` user password
|
+| sink.enable-2pc | bool | No | true | Whether to
enable two-phase commit (2pc), the default is true, to ensure Exactly-Once
semantics. SelectDB uses cache files to load data. When the amount of data is
large, cached data may become invalid (the default expiration time is 1 hour).
If you encounter a large amount of data write loss, please configure
sink.enable-2pc to false. |
+| table.identifier | String | Yes | - | The name
of `SelectDB Cloud` table, the format is `database.table`
|
+| sink.enable-delete | bool | No | false | Whether to
enable deletion. This option requires SelectDB Cloud table to enable batch
delete function, and only supports Unique model.
|
+| sink.max-retries | int | No | 3 | the max
retry times if writing records to database failed
|
+| sink.buffer-size | int | No | 10 * 1024 * 1024 (1MB) | the buffer
size to cache data for stream load.
|
+| sink.buffer-count | int | No | 10000 | the buffer
count to cache data for stream load.
|
+| selectdb.config | map | yes | - | This
option is used to support operations such as `insert`, `delete`, and `update`
when automatically generate sql,and supported formats.
|
## Data Type Mapping
@@ -170,10 +171,3 @@ sink {
}
```
-## Changelog
-
-### next version
-
-- [Feature] Support SelectDB Cloud Sink Connector
[3958](https://github.com/apache/seatunnel/pull/3958)
-- [Improve] Refactor some SelectDB Cloud Sink code as well as support copy
into batch and async flush and cdc
[4312](https://github.com/apache/seatunnel/pull/4312)
-
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/config/SelectDBConfig.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/config/SelectDBConfig.java
index 7025387364..50c3442c6b 100644
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/config/SelectDBConfig.java
+++
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/config/SelectDBConfig.java
@@ -71,6 +71,11 @@ public class SelectDBConfig {
.noDefaultValue()
.withDescription("the jdbc password.");
+ public static final Option<Boolean> SINK_ENABLE_2PC =
+ Options.key("sink.enable-2pc")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("enable 2PC while loading");
// sink config options
public static final Option<Integer> SINK_MAX_RETRIES =
Options.key("sink.max-retries")
@@ -120,6 +125,7 @@ public class SelectDBConfig {
private String tableIdentifier;
private Boolean enableDelete;
private String labelPrefix;
+ private boolean enable2PC;
private Integer maxRetries;
private Integer bufferSize;
private Integer bufferCount;
@@ -146,6 +152,11 @@ public class SelectDBConfig {
} else {
selectdbConfig.setMaxRetries(SINK_MAX_RETRIES.defaultValue());
}
+ if (pluginConfig.hasPath(SINK_ENABLE_2PC.key())) {
+
selectdbConfig.setEnable2PC(pluginConfig.getBoolean(SINK_ENABLE_2PC.key()));
+ } else {
+ selectdbConfig.setEnable2PC(SINK_ENABLE_2PC.defaultValue());
+ }
if (pluginConfig.hasPath(SINK_BUFFER_SIZE.key())) {
selectdbConfig.setBufferSize(pluginConfig.getInt(SINK_BUFFER_SIZE.key()));
} else {
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/committer/SelectDBCommitter.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/rest/CopySQLUtil.java
similarity index 71%
copy from
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/committer/SelectDBCommitter.java
copy to
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/rest/CopySQLUtil.java
index 7c210be845..48e19b1b1a 100644
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/committer/SelectDBCommitter.java
+++
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/rest/CopySQLUtil.java
@@ -15,16 +15,11 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.selectdb.sink.committer;
+package org.apache.seatunnel.connectors.selectdb.rest;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.connectors.selectdb.config.SelectDBConfig;
import
org.apache.seatunnel.connectors.selectdb.exception.SelectDBConnectorErrorCode;
import
org.apache.seatunnel.connectors.selectdb.exception.SelectDBConnectorException;
-import org.apache.seatunnel.connectors.selectdb.rest.BaseResponse;
-import org.apache.seatunnel.connectors.selectdb.rest.CopyIntoResp;
import org.apache.seatunnel.connectors.selectdb.sink.writer.LoadStatus;
import org.apache.seatunnel.connectors.selectdb.util.HttpPostBuilder;
import org.apache.seatunnel.connectors.selectdb.util.HttpUtil;
@@ -40,53 +35,21 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
@Slf4j
-public class SelectDBCommitter implements SinkCommitter<SelectDBCommitInfo> {
+public class CopySQLUtil {
+
private static final String COMMIT_PATTERN = "http://%s/copy/query";
private static final int HTTP_TEMPORARY_REDIRECT = 200;
- private final ObjectMapper objectMapper = new ObjectMapper();
- private final CloseableHttpClient httpClient;
- private final SelectDBConfig selectdbConfig;
- int maxRetry;
-
- public SelectDBCommitter(Config pluginConfig) {
- this(
- SelectDBConfig.loadConfig(pluginConfig),
- SelectDBConfig.loadConfig(pluginConfig).getMaxRetries(),
- new HttpUtil().getHttpClient());
- }
-
- public SelectDBCommitter(
- SelectDBConfig selectdbConfig, int maxRetry, CloseableHttpClient
client) {
- this.selectdbConfig = selectdbConfig;
- this.maxRetry = maxRetry;
- this.httpClient = client;
- }
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
- @Override
- public List<SelectDBCommitInfo> commit(List<SelectDBCommitInfo>
commitInfos)
+ public static void copyFileToDatabase(
+ SelectDBConfig selectdbConfig, String clusterName, String copySQL,
String hostPort)
throws IOException {
- for (SelectDBCommitInfo committable : commitInfos) {
- commitTransaction(committable);
- }
- return Collections.emptyList();
- }
-
- @Override
- public void abort(List<SelectDBCommitInfo> commitInfos) throws IOException
{}
-
- private void commitTransaction(SelectDBCommitInfo commitInfo) throws
IOException {
long start = System.currentTimeMillis();
- String hostPort = commitInfo.getHostPort();
- String clusterName = commitInfo.getClusterName();
- String copySQL = commitInfo.getCopySQL();
- log.info("commit to cluster {} with copy sql: {}", clusterName,
copySQL);
-
+ CloseableHttpClient httpClient = HttpUtil.getHttpClient();
int statusCode = -1;
String reasonPhrase = null;
int retry = 0;
@@ -96,12 +59,12 @@ public class SelectDBCommitter implements
SinkCommitter<SelectDBCommitInfo> {
boolean success = false;
CloseableHttpResponse response;
String loadResult = "";
- while (retry++ <= maxRetry) {
+ while (retry++ <= selectdbConfig.getMaxRetries()) {
HttpPostBuilder postBuilder = new HttpPostBuilder();
postBuilder
.setUrl(String.format(COMMIT_PATTERN, hostPort))
.baseAuth(selectdbConfig.getUsername(),
selectdbConfig.getPassword())
- .setEntity(new
StringEntity(objectMapper.writeValueAsString(params)));
+ .setEntity(new
StringEntity(OBJECT_MAPPER.writeValueAsString(params)));
try {
response = httpClient.execute(postBuilder.build());
} catch (IOException e) {
@@ -135,7 +98,7 @@ public class SelectDBCommitter implements
SinkCommitter<SelectDBCommitInfo> {
throw new SelectDBConnectorException(
SelectDBConnectorErrorCode.COMMIT_FAILED,
"commit failed with SQL: "
- + commitInfo.getCopySQL()
+ + copySQL
+ " Commit error with status: "
+ statusCode
+ ", Reason: "
@@ -145,9 +108,9 @@ public class SelectDBCommitter implements
SinkCommitter<SelectDBCommitInfo> {
}
}
- public boolean handleCommitResponse(String loadResult) throws IOException {
+ private static boolean handleCommitResponse(String loadResult) throws
IOException {
BaseResponse<CopyIntoResp> baseResponse =
- objectMapper.readValue(
+ OBJECT_MAPPER.readValue(
loadResult, new
TypeReference<BaseResponse<CopyIntoResp>>() {});
if (baseResponse.getCode() == LoadStatus.SUCCESS) {
CopyIntoResp dataResp = baseResponse.getData();
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/committer/SelectDBCommitter.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/committer/SelectDBCommitter.java
index 7c210be845..9fea3e2ebf 100644
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/committer/SelectDBCommitter.java
+++
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/committer/SelectDBCommitter.java
@@ -21,51 +21,24 @@ import
org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.connectors.selectdb.config.SelectDBConfig;
-import
org.apache.seatunnel.connectors.selectdb.exception.SelectDBConnectorErrorCode;
-import
org.apache.seatunnel.connectors.selectdb.exception.SelectDBConnectorException;
-import org.apache.seatunnel.connectors.selectdb.rest.BaseResponse;
-import org.apache.seatunnel.connectors.selectdb.rest.CopyIntoResp;
-import org.apache.seatunnel.connectors.selectdb.sink.writer.LoadStatus;
-import org.apache.seatunnel.connectors.selectdb.util.HttpPostBuilder;
-import org.apache.seatunnel.connectors.selectdb.util.HttpUtil;
-import org.apache.seatunnel.connectors.selectdb.util.ResponseUtil;
+import org.apache.seatunnel.connectors.selectdb.rest.CopySQLUtil;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.util.EntityUtils;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
@Slf4j
public class SelectDBCommitter implements SinkCommitter<SelectDBCommitInfo> {
- private static final String COMMIT_PATTERN = "http://%s/copy/query";
- private static final int HTTP_TEMPORARY_REDIRECT = 200;
- private final ObjectMapper objectMapper = new ObjectMapper();
- private final CloseableHttpClient httpClient;
private final SelectDBConfig selectdbConfig;
- int maxRetry;
public SelectDBCommitter(Config pluginConfig) {
- this(
- SelectDBConfig.loadConfig(pluginConfig),
- SelectDBConfig.loadConfig(pluginConfig).getMaxRetries(),
- new HttpUtil().getHttpClient());
+ this(SelectDBConfig.loadConfig(pluginConfig));
}
- public SelectDBCommitter(
- SelectDBConfig selectdbConfig, int maxRetry, CloseableHttpClient
client) {
+ public SelectDBCommitter(SelectDBConfig selectdbConfig) {
this.selectdbConfig = selectdbConfig;
- this.maxRetry = maxRetry;
- this.httpClient = client;
}
@Override
@@ -78,95 +51,13 @@ public class SelectDBCommitter implements
SinkCommitter<SelectDBCommitInfo> {
}
@Override
- public void abort(List<SelectDBCommitInfo> commitInfos) throws IOException
{}
+ public void abort(List<SelectDBCommitInfo> commitInfos) {}
private void commitTransaction(SelectDBCommitInfo commitInfo) throws
IOException {
- long start = System.currentTimeMillis();
String hostPort = commitInfo.getHostPort();
String clusterName = commitInfo.getClusterName();
String copySQL = commitInfo.getCopySQL();
log.info("commit to cluster {} with copy sql: {}", clusterName,
copySQL);
-
- int statusCode = -1;
- String reasonPhrase = null;
- int retry = 0;
- Map<String, String> params = new HashMap<>();
- params.put("cluster", clusterName);
- params.put("sql", copySQL);
- boolean success = false;
- CloseableHttpResponse response;
- String loadResult = "";
- while (retry++ <= maxRetry) {
- HttpPostBuilder postBuilder = new HttpPostBuilder();
- postBuilder
- .setUrl(String.format(COMMIT_PATTERN, hostPort))
- .baseAuth(selectdbConfig.getUsername(),
selectdbConfig.getPassword())
- .setEntity(new
StringEntity(objectMapper.writeValueAsString(params)));
- try {
- response = httpClient.execute(postBuilder.build());
- } catch (IOException e) {
- log.error("commit error : ", e);
- continue;
- }
- statusCode = response.getStatusLine().getStatusCode();
- reasonPhrase = response.getStatusLine().getReasonPhrase();
- if (statusCode != HTTP_TEMPORARY_REDIRECT) {
- log.warn(
- "commit failed with status {} {}, reason {}",
- statusCode,
- hostPort,
- reasonPhrase);
- } else if (response.getEntity() != null) {
- loadResult = EntityUtils.toString(response.getEntity());
- success = handleCommitResponse(loadResult);
- if (success) {
- log.info(
- "commit success cost {}ms, response is {}",
- System.currentTimeMillis() - start,
- loadResult);
- break;
- } else {
- log.warn("commit failed, retry again");
- }
- }
- }
-
- if (!success) {
- throw new SelectDBConnectorException(
- SelectDBConnectorErrorCode.COMMIT_FAILED,
- "commit failed with SQL: "
- + commitInfo.getCopySQL()
- + " Commit error with status: "
- + statusCode
- + ", Reason: "
- + reasonPhrase
- + ", Response: "
- + loadResult);
- }
- }
-
- public boolean handleCommitResponse(String loadResult) throws IOException {
- BaseResponse<CopyIntoResp> baseResponse =
- objectMapper.readValue(
- loadResult, new
TypeReference<BaseResponse<CopyIntoResp>>() {});
- if (baseResponse.getCode() == LoadStatus.SUCCESS) {
- CopyIntoResp dataResp = baseResponse.getData();
- if (LoadStatus.FAIL.equals(dataResp.getDataCode())) {
- log.error("copy into execute failed, reason:{}", loadResult);
- return false;
- } else {
- Map<String, String> result = dataResp.getResult();
- if (!result.get("state").equals("FINISHED")
- && !ResponseUtil.isCommitted(result.get("msg"))) {
- log.error("copy into load failed, reason:{}", loadResult);
- return false;
- } else {
- return true;
- }
- }
- } else {
- log.error("commit failed, reason:{}", loadResult);
- return false;
- }
+ CopySQLUtil.copyFileToDatabase(selectdbConfig, clusterName, copySQL,
hostPort);
}
}
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBSinkWriter.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBSinkWriter.java
index 8a0bd04400..43420d8ac7 100644
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBSinkWriter.java
@@ -98,7 +98,7 @@ public class SelectDBSinkWriter
}
@Override
- public synchronized Optional<SelectDBCommitInfo> prepareCommit() throws
IOException {
+ public synchronized Optional<SelectDBCommitInfo> prepareCommit() {
checkState(selectDBStageLoad != null);
log.info("checkpoint arrived, upload buffer to storage");
try {
@@ -106,6 +106,10 @@ public class SelectDBSinkWriter
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
+ if (!selectdbConfig.isEnable2PC()) {
+ return Optional.empty();
+ }
+
CopySQLBuilder copySQLBuilder =
new CopySQLBuilder(selectdbConfig,
selectDBStageLoad.getFileList());
String copySql = copySQLBuilder.buildCopySQL();
@@ -115,11 +119,12 @@ public class SelectDBSinkWriter
}
@Override
- public synchronized List<SelectDBSinkState> snapshotState(long
checkpointId)
- throws IOException {
+ public synchronized List<SelectDBSinkState> snapshotState(long
checkpointId) {
checkState(selectDBStageLoad != null);
- log.info("clear the file list {}", selectDBStageLoad.getFileList());
- this.selectDBStageLoad.clearFileList();
+ if (selectdbConfig.isEnable2PC()) {
+ log.info("clear the file list {}",
selectDBStageLoad.getFileList());
+ this.selectDBStageLoad.clearFileList();
+ }
this.selectDBStageLoad.setCurrentCheckpointID(checkpointId + 1);
return Collections.singletonList(selectdbSinkState);
}
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBStageLoad.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBStageLoad.java
index 44aaf89772..7bc2f8c158 100644
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBStageLoad.java
+++
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBStageLoad.java
@@ -21,6 +21,7 @@ import
org.apache.seatunnel.connectors.selectdb.config.SelectDBConfig;
import
org.apache.seatunnel.connectors.selectdb.exception.SelectDBConnectorErrorCode;
import
org.apache.seatunnel.connectors.selectdb.exception.SelectDBConnectorException;
import org.apache.seatunnel.connectors.selectdb.rest.BaseResponse;
+import org.apache.seatunnel.connectors.selectdb.rest.CopySQLUtil;
import org.apache.seatunnel.connectors.selectdb.util.HttpPutBuilder;
import org.apache.http.Header;
@@ -32,8 +33,6 @@ import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.annotations.VisibleForTesting;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
@@ -57,7 +56,6 @@ import static
org.apache.seatunnel.connectors.selectdb.sink.writer.LoadConstants
@Slf4j
public class SelectDBStageLoad implements Serializable {
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final LabelGenerator labelGenerator;
private final String lineDelimiter;
private static final String UPLOAD_URL_PATTERN = "http://%s/copy/upload";
@@ -67,8 +65,6 @@ public class SelectDBStageLoad implements Serializable {
private String hostPort;
private final String username;
private final String password;
- private final String db;
- private final String table;
private final Properties stageLoadProps;
private List<String> fileList = new CopyOnWriteArrayList();
private RecordBuffer buffer;
@@ -84,9 +80,6 @@ public class SelectDBStageLoad implements Serializable {
public SelectDBStageLoad(SelectDBConfig selectdbConfig, LabelGenerator
labelGenerator) {
this.selectdbConfig = selectdbConfig;
this.hostPort = selectdbConfig.getLoadUrl();
- String[] tableInfo = selectdbConfig.getTableIdentifier().split("\\.");
- this.db = tableInfo[0];
- this.table = tableInfo[1];
this.username = selectdbConfig.getUsername();
this.password = selectdbConfig.getPassword();
this.labelGenerator = labelGenerator;
@@ -178,6 +171,7 @@ public class SelectDBStageLoad implements Serializable {
}
public void close() {
+ this.started.set(false);
this.loadExecutorService.shutdown();
}
@@ -185,11 +179,6 @@ public class SelectDBStageLoad implements Serializable {
this.currentCheckpointID = currentCheckpointID;
}
- @VisibleForTesting
- public void setHttpClientBuilder(HttpClientBuilder httpClientBuilder) {
- this.httpClientBuilder = httpClientBuilder;
- }
-
class StageLoadAsyncExecutor implements Runnable {
@Override
public void run() {
@@ -200,6 +189,18 @@ public class SelectDBStageLoad implements Serializable {
if (buffer != null && buffer.getFileName() != null) {
uploadToStorage(buffer.getFileName(), buffer);
fileList.add(buffer.getFileName());
+ if (!selectdbConfig.isEnable2PC()) {
+ CopySQLBuilder copySQLBuilder =
+ new CopySQLBuilder(selectdbConfig,
fileList);
+ String copySql = copySQLBuilder.buildCopySQL();
+ CopySQLUtil.copyFileToDatabase(
+ selectdbConfig,
+ selectdbConfig.getClusterName(),
+ copySql,
+ hostPort);
+ log.info("clear the file list {}", fileList);
+ clearFileList();
+ }
}
} catch (Exception e) {
log.error("worker running error", e);
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/util/HttpUtil.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/util/HttpUtil.java
index 9ccf55451a..fb1919c336 100644
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/util/HttpUtil.java
+++
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/util/HttpUtil.java
@@ -23,7 +23,7 @@ import org.apache.http.impl.client.HttpClients;
/** util to build http client. */
public class HttpUtil {
- public HttpUtil() {}
+ private HttpUtil() {}
private static final HttpClientBuilder HTTP_CLIENT_BUILDER =
HttpClients.custom().disableRedirectHandling();