This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 11e94b216 [Improve][Connector-V2][SelectDB Cloud]Refactor some
SelectDB Cloud Sink code as well as support copy into batch and async flush and
cdc (#4312)
11e94b216 is described below
commit 11e94b216fc62a073fe42de9df7cb18f687de855
Author: yongkang.zhong <[email protected]>
AuthorDate: Thu Mar 9 17:58:41 2023 +0800
[Improve][Connector-V2][SelectDB Cloud]Refactor some SelectDB Cloud Sink
code as well as support copy into batch and async flush and cdc (#4312)
* [Improve][Connector-V2][SelectDB Cloud]Refactor some SelectDB Cloud Sink
code as well as support copy into batch and async flush and cdc
* add doc
* add doc2
* style
* fix
---
.../connector-v2/Error-Quick-Reference-Manual.md | 14 +-
docs/en/connector-v2/sink/SelectDB-Cloud.md | 71 +++--
release-note.md | 2 +
.../selectdb/serialize/SelectDBCsvSerializer.java | 55 ----
.../serialize/SelectDBDelimiterParser.java | 81 ------
.../selectdb/serialize/SelectDBJsonSerializer.java | 54 ----
.../selectdb/sink/writer/RecordBuffer.java | 141 ---------
.../selectdb/sink/writer/RecordStream.java | 65 -----
.../selectdb/sink/writer/SelectDBCopyInto.java | 264 -----------------
.../selectdb/sink/writer/SelectDBSinkWriter.java | 257 -----------------
.../connector/selectdb/util/StringUtil.java | 32 ---
.../selectdb/config/SelectDBConfig.java | 52 ++--
.../exception/SelectDBConnectorErrorCode.java | 12 +-
.../exception/SelectDBConnectorException.java | 2 +-
.../selectdb/rest/BaseResponse.java | 2 +-
.../selectdb/rest/CopyIntoResp.java | 2 +-
.../selectdb/serialize/SeaTunnelRowConverter.java} | 18 +-
.../selectdb/serialize/SeaTunnelRowSerializer.java | 150 ++++++++++
.../selectdb/serialize/SelectDBSerializer.java | 2 +-
.../selectdb/sink/EscapeHandler.java | 20 +-
.../selectdb/sink/SelectDBSink.java | 26 +-
.../sink/committer/SelectDBCommitInfo.java | 2 +-
.../committer/SelectDBCommitInfoSerializer.java | 2 +-
.../selectdb/sink/committer/SelectDBCommitter.java | 26 +-
.../selectdb/sink/writer/CopySQLBuilder.java | 6 +-
.../selectdb/sink/writer/LabelGenerator.java | 12 +-
.../selectdb/sink/writer/LoadConstants.java | 2 +-
.../selectdb/sink/writer/LoadStatus.java | 2 +-
.../selectdb/sink/writer/RecordBuffer.java | 80 ++++++
.../selectdb/sink/writer/SelectDBSinkState.java | 7 +-
.../sink/writer/SelectDBSinkStateSerializer.java | 5 +-
.../selectdb/sink/writer/SelectDBSinkWriter.java | 147 ++++++++++
.../selectdb/sink/writer/SelectDBStageLoad.java | 314 +++++++++++++++++++++
.../selectdb/util/HttpPostBuilder.java | 2 +-
.../selectdb/util/HttpPutBuilder.java | 2 +-
.../selectdb/util/HttpUtil.java | 2 +-
.../selectdb/util/ResponseUtil.java | 2 +-
37 files changed, 828 insertions(+), 1107 deletions(-)
diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
index da0afa41d..c5fec98a1 100644
--- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
+++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
@@ -169,16 +169,10 @@ problems encountered by users.
## SelectDB Cloud Connector Error Codes
-| code | description |
solution
|
-|-------------|-------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------|
-| SelectDB-01 | upload file to stage failed | When users encounter
this error code, it means that upload file to SelectDB Cloud failed, please
check the configuration and network. |
-| SelectDB-01 | commit copy into sql failed | When users encounter
this error code, it means that commit copy into sql to SelectDB Cloud failed,
please check the configuration. |
-| SelectDB-03 | Closing httpClient failed | When users encounter
this error code, it means that closing the http connection failed. please check
the network. |
-| SelectDB-04 | Get the redirected s3 address filed | When users encounter
this error code, it means that get the redirected s3 address failed, please
check the network. |
-| SelectDB-05 | error while loading data | When users encounter
this error code, it means that the file write check failed. please check the
configuration. |
-| SelectDB-07 | buffer stop failed | When users encounter
this error code, it means that the buffer stop failed. Check the detailed
exception information. |
-| SelectDB-08 | buffer read failed | When users encounter
this error code, it means that the buffer read failed. Check the detailed
exception information. |
-| SelectDB-09 | buffer write failed | When users encounter
this error code, it means that the buffer write failed. Check the detailed
exception information. |
+| code | description |
solution
|
+|-------------|-----------------------------|-------------------------------------------------------------------------------------------------------------------------------------------|
+| SelectDB-01 | stage load file error | When users encounter this error
code, it means that stage load file to SelectDB Cloud failed, please check the
configuration and network. |
+| SelectDB-02 | commit copy into sql failed | When users encounter this error
code, it means that commit copy into sql to SelectDB Cloud failed, please check
the configuration. |
## Clickhouse Connector Error Codes
diff --git a/docs/en/connector-v2/sink/SelectDB-Cloud.md
b/docs/en/connector-v2/sink/SelectDB-Cloud.md
index ccb304ba1..4ab7679e0 100644
--- a/docs/en/connector-v2/sink/SelectDB-Cloud.md
+++ b/docs/en/connector-v2/sink/SelectDB-Cloud.md
@@ -7,27 +7,33 @@
Used to send data to SelectDB Cloud. Both support streaming and batch mode.
The internal implementation of SelectDB Cloud sink connector upload after
batch caching and commit the CopyInto sql to load data into the table.
+:::tip
+
+Version Supported
+
+* supported `SelectDB Cloud version is >= 2.2.x`
+
+:::
+
## Key features
- [x] [exactly-once](../../concept/connector-v2-features.md)
-
-By default, we use 2PC commit to ensure `exactly-once`
+- [x] [cdc](../../concept/connector-v2-features.md)
## Options
-| name | type | required | default value |
-|---------------------|--------|----------|-----------------|
-| load-url | string | yes | - |
-| jdbc-url | string | yes | - |
-| cluster-name | string | yes | - |
-| username | string | yes | - |
-| password | string | yes | - |
-| table.identifier | string | yes | - |
-| selectdb.config | map | yes | - |
-| sink.buffer-size | int | no | 1024*1024 (1MB) |
-| sink.buffer-count | int | no | 3 |
-| sink.max-retries | int | no | 1 |
-| sink.check-interval | int | no | 10000 |
+| name | type | required | default value |
+|-------------------|--------|----------|------------------------|
+| load-url | string | yes | - |
+| jdbc-url | string | yes | - |
+| cluster-name | string | yes | - |
+| username | string | yes | - |
+| password | string | yes | - |
+| table.identifier | string | yes | - |
+| selectdb.config | map | yes | - |
+| sink.buffer-size | int | no | 10 * 1024 * 1024 (1MB) |
+| sink.buffer-count | int | no | 10000 |
+| sink.max-retries | int | no | 3 |
### load-url [string]
@@ -56,33 +62,36 @@ The name of `SelectDB Cloud` table, the format is
`database.table`
### sink.properties [string]
Write property configuration
+
CSV Write:
+
+```
selectdb.config {
-file.type='csv'
-file.column_separator=','
-file.line_delimiter='\n'
+ file.type="csv"
+ file.column_separator=","
+ file.line_delimiter="\n"
}
+```
+
JSON Write:
+
+```
selectdb.config {
-file.type="json"
-file.strip_outer_array="false"
+ file.type="json"
}
+```
### sink.buffer-size [string]
-Write data cache buffer size, unit byte. The default is 1 MB, and it is not
recommended to modify it.
+The maximum capacity of the cache, in bytes, that is flushed to the object
storage. The default is 10MB. it is not recommended to modify it.
### sink.buffer-count [string]
-The number of write data cache buffers, the default is 3, it is not
recommended to modify.
+Maximum number of entries flushed to the object store. The default value is
10000. it is not recommended to modify.
### sink.max-retries [string]
-The maximum number of retries in the Commit phase, the default is 1.
-
-### sink.check-interval [string]
-
-Periodic interval for writing files, in milliseconds, default 10 seconds.
+The maximum number of retries in the Commit phase, the default is 3.
## Example
@@ -99,7 +108,6 @@ sink {
password="******"
selectdb.config {
file.type="json"
- file.strip_outer_array="false"
}
}
}
@@ -117,9 +125,9 @@ sink {
username="admin"
password="******"
selectdb.config {
- file.type='csv'
- file.column_separator=','
- file.line_delimiter='\n'
+ file.type="csv"
+ file.column_separator=","
+ file.line_delimiter="\n"
}
}
}
@@ -130,4 +138,5 @@ sink {
### next version
- [Feature] Support SelectDB Cloud Sink Connector
[3958](https://github.com/apache/incubator-seatunnel/pull/3958)
+- [Improve] Refactor some SelectDB Cloud Sink code as well as support copy
into batch and async flush and cdc
[4312](https://github.com/apache/incubator-seatunnel/pull/4312)
diff --git a/release-note.md b/release-note.md
index 36a7d3762..2b026074c 100644
--- a/release-note.md
+++ b/release-note.md
@@ -40,11 +40,13 @@
- [API]Add parallelism and column projection interface #3829
- [API]Add get source method to all source connector #3846
- [Hive] Support read user-defined partitions #3842
+- [SelectDB Cloud] Support SelectDB Cloud Sink Connector #3958
- [Hive] Support read text table & Column projection #4105
- [File] Support column projection #4105
- [Github] Add github source connector #4155
- [Jdbc] Add database field to sink config #4199
- [Doris] Refactor some Doris Sink code as well as support 2pc and cdc #4235
+- [SelectDB Cloud] Refactor some SelectDB Cloud Sink code as well as support
copy into batch and async flush and cdc #4312
### Zeta Engine
- [Chore] Remove unnecessary dependencies #3795
- [Core] Improve job restart of all node down #3784
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/serialize/SelectDBCsvSerializer.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/serialize/SelectDBCsvSerializer.java
deleted file mode 100644
index 3937bdf2a..000000000
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/serialize/SelectDBCsvSerializer.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connector.selectdb.serialize;
-
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-
-public class SelectDBCsvSerializer extends SelectDBBaseSerializer implements
SelectDBSerializer {
- private static final long serialVersionUID = 1L;
-
- private final String columnSeparator;
- private final SeaTunnelRowType seaTunnelRowType;
-
- public SelectDBCsvSerializer(String sp, SeaTunnelRowType seaTunnelRowType)
{
- this.seaTunnelRowType = seaTunnelRowType;
- this.columnSeparator = SelectDBDelimiterParser.parse(sp, "\t");
- }
-
- @Override
- public void open() throws IOException {}
-
- @Override
- public byte[] serialize(SeaTunnelRow row) {
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < row.getFields().length; i++) {
- String value = convert(seaTunnelRowType.getFieldType(i),
row.getField(i));
- sb.append(null == value ? "\\N" : value);
- if (i < row.getFields().length - 1) {
- sb.append(columnSeparator);
- }
- }
- return sb.toString().getBytes(StandardCharsets.UTF_8);
- }
-
- @Override
- public void close() throws IOException {}
-}
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/serialize/SelectDBDelimiterParser.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/serialize/SelectDBDelimiterParser.java
deleted file mode 100644
index ba1628d31..000000000
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/serialize/SelectDBDelimiterParser.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connector.selectdb.serialize;
-
-import org.apache.seatunnel.common.exception.CommonErrorCode;
-import
org.apache.seatunnel.connector.selectdb.exception.SelectDBConnectorException;
-
-import com.google.common.base.Strings;
-
-import java.io.StringWriter;
-
-public class SelectDBDelimiterParser {
- private static final int SHIFT = 4;
-
- private static final String HEX_STRING = "0123456789ABCDEF";
-
- public static String parse(String sp, String dSp) throws RuntimeException {
- if (Strings.isNullOrEmpty(sp)) {
- return dSp;
- }
- if (!sp.toUpperCase().startsWith("\\X")) {
- return sp;
- }
- String hexStr = sp.substring(2);
- // check hex str
- if (hexStr.isEmpty()) {
- throw new SelectDBConnectorException(
- CommonErrorCode.ILLEGAL_ARGUMENT,
- "Failed to parse delimiter: `Hex str is empty`");
- }
- if (hexStr.length() % 2 != 0) {
- throw new SelectDBConnectorException(
- CommonErrorCode.ILLEGAL_ARGUMENT,
- "Failed to parse delimiter: `Hex str is empty`");
- }
- for (char hexChar : hexStr.toUpperCase().toCharArray()) {
- if (HEX_STRING.indexOf(hexChar) == -1) {
- throw new SelectDBConnectorException(
- CommonErrorCode.ILLEGAL_ARGUMENT,
- "Failed to parse delimiter: `Hex str is empty`");
- }
- }
- // transform to separator
- StringWriter writer = new StringWriter();
- for (byte b : hexStrToBytes(hexStr)) {
- writer.append((char) b);
- }
- return writer.toString();
- }
-
- private static byte[] hexStrToBytes(String hexStr) {
- String upperHexStr = hexStr.toUpperCase();
- int length = upperHexStr.length() / 2;
- char[] hexChars = upperHexStr.toCharArray();
- byte[] bytes = new byte[length];
- for (int i = 0; i < length; i++) {
- int pos = i * 2;
- bytes[i] = (byte) (charToByte(hexChars[pos]) << SHIFT |
charToByte(hexChars[pos + 1]));
- }
- return bytes;
- }
-
- private static byte charToByte(char c) {
- return (byte) HEX_STRING.indexOf(c);
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/serialize/SelectDBJsonSerializer.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/serialize/SelectDBJsonSerializer.java
deleted file mode 100644
index 298616db2..000000000
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/serialize/SelectDBJsonSerializer.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connector.selectdb.serialize;
-
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.utils.JsonUtils;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.Map;
-
-public class SelectDBJsonSerializer extends SelectDBBaseSerializer implements
SelectDBSerializer {
-
- private static final long serialVersionUID = 1L;
- private final SeaTunnelRowType seaTunnelRowType;
-
- public SelectDBJsonSerializer(SeaTunnelRowType seaTunnelRowType) {
- this.seaTunnelRowType = seaTunnelRowType;
- }
-
- @Override
- public void open() throws IOException {}
-
- @Override
- public byte[] serialize(SeaTunnelRow row) {
- Map<String, Object> rowMap = new HashMap<>(row.getFields().length);
-
- for (int i = 0; i < row.getFields().length; i++) {
- String value = convert(seaTunnelRowType.getFieldType(i),
row.getField(i));
- rowMap.put(seaTunnelRowType.getFieldName(i), value);
- }
- return JsonUtils.toJsonString(rowMap).getBytes(StandardCharsets.UTF_8);
- }
-
- @Override
- public void close() throws IOException {}
-}
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/writer/RecordBuffer.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/writer/RecordBuffer.java
deleted file mode 100644
index 92797f1f8..000000000
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/writer/RecordBuffer.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connector.selectdb.sink.writer;
-
-import
org.apache.seatunnel.connector.selectdb.exception.SelectDBConnectorException;
-
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingDeque;
-
-import static com.google.common.base.Preconditions.checkState;
-import static
org.apache.seatunnel.connector.selectdb.exception.SelectDBConnectorErrorCode.BUFFER_STOP_FAILED;
-
-@Slf4j
-public class RecordBuffer {
- BlockingQueue<ByteBuffer> writeQueue;
- BlockingQueue<ByteBuffer> readQueue;
- int bufferCapacity;
- int queueSize;
- ByteBuffer currentWriteBuffer;
- ByteBuffer currentReadBuffer;
-
- public RecordBuffer(int capacity, int queueSize) {
- log.info("init RecordBuffer capacity {}, count {}", capacity,
queueSize);
- checkState(capacity > 0);
- checkState(queueSize > 1);
- this.writeQueue = new ArrayBlockingQueue<>(queueSize);
- for (int index = 0; index < queueSize; index++) {
- this.writeQueue.add(ByteBuffer.allocate(capacity));
- }
- readQueue = new LinkedBlockingDeque<>();
- this.bufferCapacity = capacity;
- this.queueSize = queueSize;
- }
-
- public void startBufferData() {
- log.info(
- "start buffer data, read queue size {}, write queue size {}",
- readQueue.size(),
- writeQueue.size());
- checkState(readQueue.size() == 0);
- checkState(writeQueue.size() == queueSize);
- for (ByteBuffer byteBuffer : writeQueue) {
- checkState(byteBuffer.position() == 0);
- checkState(byteBuffer.remaining() == bufferCapacity);
- }
- }
-
- public void stopBufferData() throws IOException {
- try {
- // add Empty buffer as finish flag.
- boolean isEmpty = false;
- if (currentWriteBuffer != null) {
- currentWriteBuffer.flip();
- // check if the current write buffer is empty.
- isEmpty = currentWriteBuffer.limit() == 0;
- readQueue.put(currentWriteBuffer);
- currentWriteBuffer = null;
- }
- if (!isEmpty) {
- ByteBuffer byteBuffer = writeQueue.take();
- byteBuffer.flip();
- checkState(byteBuffer.limit() == 0);
- readQueue.put(byteBuffer);
- }
- } catch (Exception e) {
- throw new SelectDBConnectorException(BUFFER_STOP_FAILED, e);
- }
- }
-
- public void write(byte[] buf) throws InterruptedException {
- int wPos = 0;
- do {
- if (currentWriteBuffer == null) {
- currentWriteBuffer = writeQueue.take();
- }
- int available = currentWriteBuffer.remaining();
- int nWrite = Math.min(available, buf.length - wPos);
- currentWriteBuffer.put(buf, wPos, nWrite);
- wPos += nWrite;
- if (currentWriteBuffer.remaining() == 0) {
- currentWriteBuffer.flip();
- readQueue.put(currentWriteBuffer);
- currentWriteBuffer = null;
- }
- } while (wPos != buf.length);
- }
-
- public int read(byte[] buf) throws InterruptedException {
- if (currentReadBuffer == null) {
- currentReadBuffer = readQueue.take();
- }
- // add empty buffer as end flag
- if (currentReadBuffer.limit() == 0) {
- recycleBuffer(currentReadBuffer);
- currentReadBuffer = null;
- checkState(readQueue.size() == 0);
- return -1;
- }
- int available = currentReadBuffer.remaining();
- int nRead = Math.min(available, buf.length);
- currentReadBuffer.get(buf, 0, nRead);
- if (currentReadBuffer.remaining() == 0) {
- recycleBuffer(currentReadBuffer);
- currentReadBuffer = null;
- }
- return nRead;
- }
-
- private void recycleBuffer(ByteBuffer buffer) throws InterruptedException {
- buffer.clear();
- writeQueue.put(buffer);
- }
-
- public int getWriteQueueSize() {
- return writeQueue.size();
- }
-
- public int getReadQueueSize() {
- return readQueue.size();
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/writer/RecordStream.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/writer/RecordStream.java
deleted file mode 100644
index 5c7a72cbf..000000000
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/writer/RecordStream.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connector.selectdb.sink.writer;
-
-import
org.apache.seatunnel.connector.selectdb.exception.SelectDBConnectorException;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import static
org.apache.seatunnel.connector.selectdb.exception.SelectDBConnectorErrorCode.BUFFER_READ_FAILED;
-import static
org.apache.seatunnel.connector.selectdb.exception.SelectDBConnectorErrorCode.BUFFER_WRITE_FAILED;
-
-/** Record Stream for writing record. */
-public class RecordStream extends InputStream {
- private final RecordBuffer recordBuffer;
-
- @Override
- public int read() throws IOException {
- return 0;
- }
-
- public RecordStream(int bufferSize, int bufferCount) {
- this.recordBuffer = new RecordBuffer(bufferSize, bufferCount);
- }
-
- public void startInput() {
- recordBuffer.startBufferData();
- }
-
- public void endInput() throws IOException {
- recordBuffer.stopBufferData();
- }
-
- @Override
- public int read(byte[] buff) throws IOException {
- try {
- return recordBuffer.read(buff);
- } catch (InterruptedException e) {
- throw new SelectDBConnectorException(BUFFER_READ_FAILED, e);
- }
- }
-
- public void write(byte[] buff) throws IOException {
- try {
- recordBuffer.write(buff);
- } catch (InterruptedException e) {
- throw new SelectDBConnectorException(BUFFER_WRITE_FAILED, e);
- }
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/writer/SelectDBCopyInto.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/writer/SelectDBCopyInto.java
deleted file mode 100644
index b2e17e793..000000000
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/writer/SelectDBCopyInto.java
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connector.selectdb.sink.writer;
-
-import org.apache.seatunnel.connector.selectdb.config.SelectDBConfig;
-import
org.apache.seatunnel.connector.selectdb.exception.SelectDBConnectorErrorCode;
-import
org.apache.seatunnel.connector.selectdb.exception.SelectDBConnectorException;
-import org.apache.seatunnel.connector.selectdb.rest.BaseResponse;
-import org.apache.seatunnel.connector.selectdb.util.HttpPutBuilder;
-import org.apache.seatunnel.connector.selectdb.util.HttpUtil;
-import org.apache.seatunnel.connector.selectdb.util.StringUtil;
-
-import org.apache.http.Header;
-import org.apache.http.HttpEntity;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.entity.InputStreamEntity;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.util.EntityUtils;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import static com.google.common.base.Preconditions.checkState;
-import static
org.apache.seatunnel.connector.selectdb.exception.SelectDBConnectorErrorCode.CLOSE_HTTP_FAILED;
-import static
org.apache.seatunnel.connector.selectdb.exception.SelectDBConnectorErrorCode.REDIRECTED_FAILED;
-import static
org.apache.seatunnel.connector.selectdb.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
-import static
org.apache.seatunnel.connector.selectdb.sink.writer.LoadConstants.LINE_DELIMITER_KEY;
-
-@Slf4j
-public class SelectDBCopyInto implements Serializable {
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
- private static final int HTTP_TEMPORARY_REDIRECT = 200;
- private static final int HTTP_SUCCESS = 307;
- private final LabelGenerator labelGenerator;
- private final byte[] lineDelimiter;
- private static final String UPLOAD_URL_PATTERN = "http://%s/copy/upload";
-
- private String uploadUrl;
- private String hostPort;
- private final String user;
- private final String passwd;
- private final String db;
- private final String table;
- private final boolean enable2PC;
- private final Properties streamLoadProp;
- private final RecordStream recordStream;
- private Future<CloseableHttpResponse> pendingLoadFuture;
- private final CloseableHttpClient httpClient;
- private final ExecutorService executorService;
- private boolean loadBatchFirstRecord;
- private List<String> fileList = new CopyOnWriteArrayList();
-
- private String fileName;
-
- public SelectDBCopyInto(
- SelectDBConfig selectdbConfig,
- LabelGenerator labelGenerator,
- CloseableHttpClient httpClient) {
- this.hostPort = selectdbConfig.getLoadUrl();
- String[] tableInfo = selectdbConfig.getTableIdentifier().split("\\.");
- this.db = tableInfo[0];
- this.table = tableInfo[1];
- this.user = selectdbConfig.getUsername();
- this.passwd = selectdbConfig.getPassword();
- this.labelGenerator = labelGenerator;
- this.uploadUrl = String.format(UPLOAD_URL_PATTERN, hostPort);
- this.enable2PC = selectdbConfig.getEnable2PC();
- this.streamLoadProp = selectdbConfig.getStreamLoadProps();
- this.httpClient = httpClient;
- this.executorService =
- new ThreadPoolExecutor(
- 1,
- 1,
- 0L,
- TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<>(),
- new
ThreadFactoryBuilder().setNameFormat("file-load-upload").build());
- this.recordStream =
- new RecordStream(selectdbConfig.getBufferSize(),
selectdbConfig.getBufferCount());
- lineDelimiter =
- streamLoadProp.getProperty(LINE_DELIMITER_KEY,
LINE_DELIMITER_DEFAULT).getBytes();
- loadBatchFirstRecord = true;
- }
-
- public String getDb() {
- return db;
- }
-
- public String getHostPort() {
- return hostPort;
- }
-
- public Future<CloseableHttpResponse> getPendingLoadFuture() {
- return pendingLoadFuture;
- }
-
- public String getFileName() {
- return fileName;
- }
-
- public List<String> getFileList() {
- return fileList;
- }
-
- public void clearFileList() {
- fileList.clear();
- }
-
- public void writeRecord(byte[] record) throws IOException {
- if (loadBatchFirstRecord) {
- loadBatchFirstRecord = false;
- } else {
- recordStream.write(lineDelimiter);
- }
- recordStream.write(record);
- }
-
- @VisibleForTesting
- public RecordStream getRecordStream() {
- return recordStream;
- }
-
- public BaseResponse<HashMap<String, String>>
handleResponse(CloseableHttpResponse response)
- throws IOException {
- try {
- final int statusCode = response.getStatusLine().getStatusCode();
- if (statusCode == HTTP_TEMPORARY_REDIRECT && response.getEntity()
!= null) {
- String loadResult = EntityUtils.toString(response.getEntity());
- if (StringUtil.isNullOrWhitespaceOnly(loadResult)) {
- return null;
- }
- log.info("response result {}", loadResult);
- BaseResponse<HashMap<String, String>> baseResponse =
- OBJECT_MAPPER.readValue(
- loadResult,
- new TypeReference<BaseResponse<HashMap<String,
String>>>() {});
- if (baseResponse.getCode() == 0) {
- return baseResponse;
- } else {
- throw new SelectDBConnectorException(
- SelectDBConnectorErrorCode.UPLOAD_FAILED,
baseResponse.getMsg());
- }
- }
- throw new SelectDBConnectorException(
- SelectDBConnectorErrorCode.UPLOAD_FAILED,
response.getStatusLine().toString());
- } finally {
- if (response != null) {
- response.close();
- }
- }
- }
-
- public void stopLoad() throws IOException {
- recordStream.endInput();
- log.info("file {} write stopped.", fileName);
- checkState(pendingLoadFuture != null);
- try {
- handleResponse(pendingLoadFuture.get());
- log.info("upload file {} finished", fileName);
- fileList.add(fileName);
- } catch (Exception e) {
- throw new
SelectDBConnectorException(SelectDBConnectorErrorCode.UPLOAD_FAILED, e);
- }
- }
-
- public void startLoad(String fileName) throws IOException {
- this.fileName = fileName;
- loadBatchFirstRecord = true;
- recordStream.startInput();
- log.info("file write started for {}", fileName);
- try {
- String address = getUploadAddress(fileName);
- log.info("redirect to s3 address:{}", address);
- InputStreamEntity entity = new InputStreamEntity(recordStream);
- HttpPutBuilder putBuilder = new HttpPutBuilder();
- putBuilder.setUrl(address).addCommonHeader().setEntity(entity);
- pendingLoadFuture =
- executorService.submit(
- () -> {
- log.info("start execute load {}", fileName);
- return new
HttpUtil().getHttpClient().execute(putBuilder.build());
- });
- } catch (Exception e) {
- String err = "failed to write data with fileName: " + fileName;
- log.warn(err, e);
- throw e;
- }
- }
-
- /** Get the redirected s3 address */
- public String getUploadAddress(String fileName) throws IOException {
- HttpPutBuilder putBuilder = new HttpPutBuilder();
- putBuilder
- .setUrl(uploadUrl)
- .addFileName(fileName)
- .addCommonHeader()
- .setEmptyEntity()
- .baseAuth(user, passwd);
-
- try (CloseableHttpResponse execute =
httpClient.execute(putBuilder.build())) {
- int statusCode = execute.getStatusLine().getStatusCode();
- String reason = execute.getStatusLine().getReasonPhrase();
- if (statusCode == HTTP_SUCCESS) {
- Header location = execute.getFirstHeader("location");
- String uploadAddress = location.getValue();
- return uploadAddress;
- } else {
- HttpEntity entity = execute.getEntity();
- String result = entity == null ? null :
EntityUtils.toString(entity);
- throw new SelectDBConnectorException(
- REDIRECTED_FAILED,
- "Could not get the redirected address. Status: "
- + statusCode
- + ", Reason: "
- + reason
- + ", Response: "
- + result);
- }
- }
- }
-
- public void close() throws IOException {
- if (null != httpClient) {
- try {
- httpClient.close();
- } catch (IOException e) {
- throw new SelectDBConnectorException(CLOSE_HTTP_FAILED, e);
- }
- }
- if (null != executorService) {
- executorService.shutdownNow();
- }
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/writer/SelectDBSinkWriter.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/writer/SelectDBSinkWriter.java
deleted file mode 100644
index ab62afe05..000000000
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/writer/SelectDBSinkWriter.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connector.selectdb.sink.writer;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.exception.CommonErrorCode;
-import org.apache.seatunnel.connector.selectdb.config.SelectDBConfig;
-import
org.apache.seatunnel.connector.selectdb.exception.SelectDBConnectorException;
-import org.apache.seatunnel.connector.selectdb.serialize.SelectDBCsvSerializer;
-import
org.apache.seatunnel.connector.selectdb.serialize.SelectDBJsonSerializer;
-import org.apache.seatunnel.connector.selectdb.serialize.SelectDBSerializer;
-import
org.apache.seatunnel.connector.selectdb.sink.committer.SelectDBCommitInfo;
-import org.apache.seatunnel.connector.selectdb.util.HttpUtil;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static com.google.common.base.Preconditions.checkState;
-import static
org.apache.seatunnel.connector.selectdb.exception.SelectDBConnectorErrorCode.WHILE_LOADING_FAILED;
-import static
org.apache.seatunnel.connector.selectdb.sink.writer.LoadConstants.FIELD_DELIMITER_KEY;
-import static
org.apache.seatunnel.connector.selectdb.sink.writer.LoadConstants.FORMAT_KEY;
-import static
org.apache.seatunnel.connector.selectdb.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
-import static
org.apache.seatunnel.connector.selectdb.sink.writer.LoadConstants.LINE_DELIMITER_KEY;
-
-@Slf4j
-public class SelectDBSinkWriter
- implements SinkWriter<SeaTunnelRow, SelectDBCommitInfo,
SelectDBSinkState> {
- private final SelectDBConfig selectdbConfig;
- private final long lastCheckpointId;
- private volatile long currentCheckpointId;
- private SelectDBCopyInto selectdbCopyInto;
- volatile boolean loading;
- private final String labelPrefix;
- private final byte[] lineDelimiter;
- private final LabelGenerator labelGenerator;
- private final int intervalTime;
- private final SelectDBSinkState selectdbSinkState;
- private final SelectDBSerializer serializer;
- private final transient ScheduledExecutorService scheduledExecutorService;
- private transient volatile Exception loadException = null;
- private final AtomicInteger fileNum;
-
- private final ArrayList<byte[]> cache = new ArrayList<>();
- private int cacheSize = 0;
- private int cacheCnt = 0;
-
- private static final long MAX_CACHE_SIZE = 1024 * 1024L;
- private static final int INITIAL_DELAY = 1000;
-
- public SelectDBSinkWriter(
- SinkWriter.Context context,
- List<SelectDBSinkState> state,
- SeaTunnelRowType seaTunnelRowType,
- Config pluginConfig) {
- this.selectdbConfig = SelectDBConfig.loadConfig(pluginConfig);
- this.lastCheckpointId = context.getIndexOfSubtask();
- log.info("restore checkpointId {}", lastCheckpointId);
- this.currentCheckpointId = lastCheckpointId;
- log.info("labelPrefix " + selectdbConfig.getLabelPrefix());
- this.selectdbSinkState = new
SelectDBSinkState(selectdbConfig.getLabelPrefix());
- this.labelPrefix = selectdbConfig.getLabelPrefix() + "_" +
context.getIndexOfSubtask();
- this.lineDelimiter =
- selectdbConfig
- .getStreamLoadProps()
- .getProperty(LINE_DELIMITER_KEY,
LINE_DELIMITER_DEFAULT)
- .getBytes();
- this.labelGenerator = new LabelGenerator(labelPrefix,
selectdbConfig.getEnable2PC());
- this.scheduledExecutorService =
- new ScheduledThreadPoolExecutor(
- 1,
- new ThreadFactoryBuilder()
- .setNameFormat("file-load-check-" +
context.getIndexOfSubtask())
- .build());
- this.serializer = createSerializer(selectdbConfig, seaTunnelRowType);
- this.intervalTime = selectdbConfig.getCheckInterval();
- this.loading = false;
- this.fileNum = new AtomicInteger();
- }
-
- public void initializeLoad(List<SelectDBSinkState> state) throws
IOException {
- this.selectdbCopyInto =
- new SelectDBCopyInto(
- selectdbConfig, labelGenerator, new
HttpUtil().getHttpClient());
- currentCheckpointId = lastCheckpointId + 1;
- scheduledExecutorService.scheduleWithFixedDelay(
- this::checkDone, INITIAL_DELAY, intervalTime,
TimeUnit.MILLISECONDS);
- serializer.open();
- }
-
- @Override
- public synchronized void write(SeaTunnelRow element) throws IOException {
- checkLoadException();
- byte[] serialize = serializer.serialize(element);
- if (Objects.isNull(serialize)) {
- // schema change is null
- return;
- }
- if (cacheSize > MAX_CACHE_SIZE) {
- flush(serialize);
- } else {
- cacheSize += serialize.length;
- cacheCnt++;
- cache.add(serialize);
- }
- }
-
- public synchronized void flush(byte[] serialize) throws IOException {
- if (!loading) {
- log.info("start load by cache full, cnt {}, size {}", cacheCnt,
cacheSize);
- startLoad();
- }
- this.selectdbCopyInto.writeRecord(serialize);
- }
-
- @Override
- public synchronized Optional<SelectDBCommitInfo> prepareCommit() throws
IOException {
- checkState(selectdbCopyInto != null);
- if (!loading) {
- // No data was written during the entire checkpoint period
- log.info("start load by checkpoint, cnt {} size {} ", cacheCnt,
cacheSize);
- startLoad();
- }
- log.info("stop load by checkpoint");
- stopLoad();
- CopySQLBuilder copySQLBuilder =
- new CopySQLBuilder(selectdbConfig,
selectdbCopyInto.getFileList());
- String copySql = copySQLBuilder.buildCopySQL();
- return Optional.of(
- new SelectDBCommitInfo(
- selectdbCopyInto.getHostPort(),
selectdbConfig.getClusterName(), copySql));
- }
-
- @Override
- public synchronized List<SelectDBSinkState> snapshotState(long
checkpointId)
- throws IOException {
- checkState(selectdbCopyInto != null);
- this.currentCheckpointId = checkpointId + 1;
-
- log.info("clear the file list {}", selectdbCopyInto.getFileList());
- this.fileNum.set(0);
- this.selectdbCopyInto.clearFileList();
- return Collections.singletonList(selectdbSinkState);
- }
-
- @Override
- public void abortPrepare() {}
-
- private synchronized void startLoad() throws IOException {
- // If not started writing, make a streaming request
- this.selectdbCopyInto.startLoad(
- labelGenerator.generateLabel(currentCheckpointId,
fileNum.getAndIncrement()));
- if (!cache.isEmpty()) {
- // add line delimiter
- ByteBuffer buf =
- ByteBuffer.allocate(cacheSize + (cache.size() - 1) *
lineDelimiter.length);
- for (int i = 0; i < cache.size(); i++) {
- if (i > 0) {
- buf.put(lineDelimiter);
- }
- buf.put(cache.get(i));
- }
- this.selectdbCopyInto.writeRecord(buf.array());
- }
- this.loading = true;
- }
-
- private synchronized void stopLoad() throws IOException {
- this.loading = false;
- this.selectdbCopyInto.stopLoad();
- this.cacheSize = 0;
- this.cacheCnt = 0;
- this.cache.clear();
- }
-
- private synchronized void checkDone() {
- // s3 can't keep http long links, generate data files regularly
- log.info("start timer checker, interval {} ms", intervalTime);
- try {
- if (!loading) {
- log.info("not loading, skip timer checker");
- return;
- }
- if (selectdbCopyInto.getPendingLoadFuture() != null
- && !selectdbCopyInto.getPendingLoadFuture().isDone()) {
- log.info("stop load by timer checker");
- stopLoad();
- }
- } catch (Exception ex) {
- log.error("upload file failed, thread exited:", ex);
- loadException = ex;
- }
- }
-
- private void checkLoadException() {
- if (loadException != null) {
- throw new SelectDBConnectorException(WHILE_LOADING_FAILED,
loadException);
- }
- }
-
- @Override
- public void close() throws IOException {
- if (scheduledExecutorService != null) {
- scheduledExecutorService.shutdownNow();
- }
- if (selectdbCopyInto != null) {
- selectdbCopyInto.close();
- }
- serializer.close();
- }
-
- public static SelectDBSerializer createSerializer(
- SelectDBConfig selectdbConfig, SeaTunnelRowType seaTunnelRowType) {
- if
(LoadConstants.CSV.equals(selectdbConfig.getStreamLoadProps().getProperty(FORMAT_KEY)))
{
- return new SelectDBCsvSerializer(
-
selectdbConfig.getStreamLoadProps().getProperty(FIELD_DELIMITER_KEY),
- seaTunnelRowType);
- }
- if (LoadConstants.JSON.equals(
- selectdbConfig.getStreamLoadProps().getProperty(FORMAT_KEY))) {
- return new SelectDBJsonSerializer(seaTunnelRowType);
- }
- throw new SelectDBConnectorException(
- CommonErrorCode.ILLEGAL_ARGUMENT,
- "Failed to create row serializer, unsupported `format` from
copy into load properties.");
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/util/StringUtil.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/util/StringUtil.java
deleted file mode 100644
index 16c8d6c6d..000000000
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/util/StringUtil.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connector.selectdb.util;
-
-public final class StringUtil {
- public static boolean isNullOrWhitespaceOnly(String str) {
- if (str != null && str.length() != 0) {
- int len = str.length();
- for (int i = 0; i < len; ++i) {
- if (!Character.isWhitespace(str.charAt(i))) {
- return false;
- }
- }
- }
- return true;
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/config/SelectDBConfig.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/config/SelectDBConfig.java
similarity index 82%
rename from
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/config/SelectDBConfig.java
rename to
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/config/SelectDBConfig.java
index 09671cae4..702538736 100644
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/config/SelectDBConfig.java
+++
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/config/SelectDBConfig.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connector.selectdb.config;
+package org.apache.seatunnel.connectors.selectdb.config;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -35,10 +35,9 @@ import java.util.UUID;
@Getter
@ToString
public class SelectDBConfig {
- private static final int DEFAULT_SINK_CHECK_INTERVAL = 10000;
private static final int DEFAULT_SINK_MAX_RETRIES = 3;
- private static final int DEFAULT_SINK_BUFFER_SIZE = 1024 * 1024;
- private static final int DEFAULT_SINK_BUFFER_COUNT = 3;
+ private static final int DEFAULT_SINK_BUFFER_SIZE = 10 * 1024 * 1024;
+ private static final int DEFAULT_SINK_BUFFER_COUNT = 10000;
// common option
public static final Option<String> LOAD_URL =
Options.key("load-url")
@@ -73,17 +72,6 @@ public class SelectDBConfig {
.withDescription("the jdbc password.");
// sink config options
- public static final Option<Boolean> SINK_ENABLE_2PC =
- Options.key("sink.enable-2pc")
- .booleanType()
- .defaultValue(true)
- .withDescription("enable 2PC while loading");
-
- public static final Option<Integer> SINK_CHECK_INTERVAL =
- Options.key("sink.check-interval")
- .intType()
- .defaultValue(DEFAULT_SINK_CHECK_INTERVAL)
- .withDescription("check exception with the interval while
loading");
public static final Option<Integer> SINK_MAX_RETRIES =
Options.key("sink.max-retries")
.intType()
@@ -110,6 +98,12 @@ public class SelectDBConfig {
.defaultValue(false)
.withDescription("whether to enable the delete function");
+ public static final Option<Integer> SINK_FLUSH_QUEUE_SIZE =
+ Options.key("sink.flush.queue-size")
+ .intType()
+ .defaultValue(1)
+ .withDescription("Queue length for async upload to object
storage");
+
public static final Option<Map<String, String>>
SELECTDB_SINK_CONFIG_PREFIX =
Options.key("selectdb.config")
.mapType()
@@ -124,14 +118,13 @@ public class SelectDBConfig {
private String username;
private String password;
private String tableIdentifier;
- private Boolean enable2PC;
private Boolean enableDelete;
private String labelPrefix;
- private Integer checkInterval;
private Integer maxRetries;
private Integer bufferSize;
private Integer bufferCount;
- private Properties streamLoadProps;
+ private Integer flushQueueSize;
+ private Properties StageLoadProps;
public static SelectDBConfig loadConfig(Config pluginConfig) {
SelectDBConfig selectdbConfig = new SelectDBConfig();
@@ -141,23 +134,13 @@ public class SelectDBConfig {
selectdbConfig.setUsername(pluginConfig.getString(USERNAME.key()));
selectdbConfig.setPassword(pluginConfig.getString(PASSWORD.key()));
selectdbConfig.setTableIdentifier(pluginConfig.getString(TABLE_IDENTIFIER.key()));
-
selectdbConfig.setStreamLoadProps(parseCopyIntoProperties(pluginConfig));
+
selectdbConfig.setStageLoadProps(parseCopyIntoProperties(pluginConfig));
- if (pluginConfig.hasPath(SINK_ENABLE_2PC.key())) {
-
selectdbConfig.setEnable2PC(pluginConfig.getBoolean(SINK_ENABLE_2PC.key()));
- } else {
- selectdbConfig.setEnable2PC(SINK_ENABLE_2PC.defaultValue());
- }
if (pluginConfig.hasPath(SINK_LABEL_PREFIX.key())) {
selectdbConfig.setLabelPrefix(pluginConfig.getString(SINK_LABEL_PREFIX.key()));
} else {
selectdbConfig.setLabelPrefix(SINK_LABEL_PREFIX.defaultValue());
}
- if (pluginConfig.hasPath(SINK_CHECK_INTERVAL.key())) {
-
selectdbConfig.setCheckInterval(pluginConfig.getInt(SINK_CHECK_INTERVAL.key()));
- } else {
-
selectdbConfig.setCheckInterval(SINK_CHECK_INTERVAL.defaultValue());
- }
if (pluginConfig.hasPath(SINK_MAX_RETRIES.key())) {
selectdbConfig.setMaxRetries(pluginConfig.getInt(SINK_MAX_RETRIES.key()));
} else {
@@ -178,20 +161,25 @@ public class SelectDBConfig {
} else {
selectdbConfig.setEnableDelete(SINK_ENABLE_DELETE.defaultValue());
}
+ if (pluginConfig.hasPath(SINK_FLUSH_QUEUE_SIZE.key())) {
+
selectdbConfig.setFlushQueueSize(pluginConfig.getInt(SINK_FLUSH_QUEUE_SIZE.key()));
+ } else {
+
selectdbConfig.setFlushQueueSize(SINK_FLUSH_QUEUE_SIZE.defaultValue());
+ }
return selectdbConfig;
}
private static Properties parseCopyIntoProperties(Config pluginConfig) {
- Properties streamLoadProps = new Properties();
+ Properties stageLoadProps = new Properties();
if (CheckConfigUtil.isValidParam(pluginConfig,
SELECTDB_SINK_CONFIG_PREFIX.key())) {
pluginConfig
.getObject(SELECTDB_SINK_CONFIG_PREFIX.key())
.forEach(
(key, value) -> {
final String configKey = key.toLowerCase();
- streamLoadProps.put(configKey,
value.unwrapped().toString());
+ stageLoadProps.put(configKey,
value.unwrapped().toString());
});
}
- return streamLoadProps;
+ return stageLoadProps;
}
}
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/exception/SelectDBConnectorErrorCode.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/exception/SelectDBConnectorErrorCode.java
similarity index 69%
rename from
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/exception/SelectDBConnectorErrorCode.java
rename to
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/exception/SelectDBConnectorErrorCode.java
index 1489ba6bb..d34503b7b 100644
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/exception/SelectDBConnectorErrorCode.java
+++
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/exception/SelectDBConnectorErrorCode.java
@@ -15,19 +15,13 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connector.selectdb.exception;
+package org.apache.seatunnel.connectors.selectdb.exception;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
public enum SelectDBConnectorErrorCode implements SeaTunnelErrorCode {
- UPLOAD_FAILED("SelectDB-01", "upload file error"),
- COMMIT_FAILED("SelectDB-02", "commit error"),
- CLOSE_HTTP_FAILED("SelectDB-03", "Closing httpClient failed"),
- REDIRECTED_FAILED("SelectDB-04", "Get the redirected s3 address filed"),
- WHILE_LOADING_FAILED("SelectDB-05", "error while loading data"),
- BUFFER_STOP_FAILED("SelectDB-06", "buffer stop failed"),
- BUFFER_READ_FAILED("SelectDB-07", "buffer read failed"),
- BUFFER_WRITE_FAILED("SelectDB-08", "buffer write failed");
+ STAGE_LOAD_FAILED("SelectDB-01", "stage load file error"),
+ COMMIT_FAILED("SelectDB-02", "commit error");
private final String code;
private final String description;
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/exception/SelectDBConnectorException.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/exception/SelectDBConnectorException.java
similarity index 96%
rename from
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/exception/SelectDBConnectorException.java
rename to
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/exception/SelectDBConnectorException.java
index a8c5b8d59..bfe1fdc92 100644
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/exception/SelectDBConnectorException.java
+++
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/exception/SelectDBConnectorException.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connector.selectdb.exception;
+package org.apache.seatunnel.connectors.selectdb.exception;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/rest/BaseResponse.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/rest/BaseResponse.java
similarity index 95%
rename from
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/rest/BaseResponse.java
rename to
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/rest/BaseResponse.java
index dcf14a7be..c2734512f 100644
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/rest/BaseResponse.java
+++
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/rest/BaseResponse.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connector.selectdb.rest;
+package org.apache.seatunnel.connectors.selectdb.rest;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/rest/CopyIntoResp.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/rest/CopyIntoResp.java
similarity index 95%
rename from
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/rest/CopyIntoResp.java
rename to
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/rest/CopyIntoResp.java
index 2ffe85bdc..4e64d709d 100644
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/rest/CopyIntoResp.java
+++
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/rest/CopyIntoResp.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connector.selectdb.rest;
+package org.apache.seatunnel.connectors.selectdb.rest;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/serialize/SelectDBBaseSerializer.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/serialize/SeaTunnelRowConverter.java
similarity index 79%
rename from
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/serialize/SelectDBBaseSerializer.java
rename to
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/serialize/SeaTunnelRowConverter.java
index 267da5f06..aee6889ec 100644
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/serialize/SelectDBBaseSerializer.java
+++
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/serialize/SeaTunnelRowConverter.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connector.selectdb.serialize;
+package org.apache.seatunnel.connectors.selectdb.serialize;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
@@ -23,7 +23,7 @@ import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.common.utils.DateUtils;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.common.utils.TimeUtils;
-import
org.apache.seatunnel.connector.selectdb.exception.SelectDBConnectorException;
+import
org.apache.seatunnel.connectors.selectdb.exception.SelectDBConnectorException;
import lombok.Builder;
@@ -31,7 +31,7 @@ import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
-public class SelectDBBaseSerializer {
+public class SeaTunnelRowConverter {
@Builder.Default private DateUtils.Formatter dateFormatter =
DateUtils.Formatter.YYYY_MM_DD;
@Builder.Default
@@ -39,33 +39,27 @@ public class SelectDBBaseSerializer {
@Builder.Default private TimeUtils.Formatter timeFormatter =
TimeUtils.Formatter.HH_MM_SS;
- protected String convert(SeaTunnelDataType dataType, Object val) {
+ protected Object convert(SeaTunnelDataType dataType, Object val) {
if (val == null) {
return null;
}
switch (dataType.getSqlType()) {
case TINYINT:
case SMALLINT:
- return String.valueOf(((Number) val).shortValue());
case INT:
- return String.valueOf(((Number) val).intValue());
case BIGINT:
- return String.valueOf(((Number) val).longValue());
case FLOAT:
- return String.valueOf(((Number) val).floatValue());
case DOUBLE:
- return String.valueOf(((Number) val).doubleValue());
case DECIMAL:
case BOOLEAN:
- return val.toString();
+ case STRING:
+ return val;
case DATE:
return DateUtils.toString((LocalDate) val, dateFormatter);
case TIME:
return TimeUtils.toString((LocalTime) val, timeFormatter);
case TIMESTAMP:
return DateTimeUtils.toString((LocalDateTime) val,
dateTimeFormatter);
- case STRING:
- return (String) val;
case ARRAY:
case MAP:
return JsonUtils.toJsonString(val);
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/serialize/SeaTunnelRowSerializer.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/serialize/SeaTunnelRowSerializer.java
new file mode 100644
index 000000000..83730e0e0
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/serialize/SeaTunnelRowSerializer.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.selectdb.serialize;
+
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.selectdb.sink.writer.LoadConstants;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.StringJoiner;
+
+import static com.google.common.base.Preconditions.checkState;
+import static
org.apache.seatunnel.connectors.selectdb.sink.writer.LoadConstants.CSV;
+import static
org.apache.seatunnel.connectors.selectdb.sink.writer.LoadConstants.JSON;
+import static
org.apache.seatunnel.connectors.selectdb.sink.writer.LoadConstants.NULL_VALUE;
+
+public class SeaTunnelRowSerializer extends SeaTunnelRowConverter implements
SelectDBSerializer {
+ String type;
+ private ObjectMapper objectMapper;
+ private final SeaTunnelRowType seaTunnelRowType;
+ private final String fieldDelimiter;
+ private final boolean enableDelete;
+
+ public SeaTunnelRowSerializer(
+ String type,
+ SeaTunnelRowType seaTunnelRowType,
+ String fieldDelimiter,
+ boolean enableDelete) {
+ this.type = type;
+ this.seaTunnelRowType = seaTunnelRowType;
+ this.fieldDelimiter = fieldDelimiter;
+ this.enableDelete = enableDelete;
+ if (JSON.equals(type)) {
+ objectMapper = new ObjectMapper();
+ }
+ }
+
+ @Override
+ public byte[] serialize(SeaTunnelRow seaTunnelRow) throws IOException {
+ String valString;
+ if (JSON.equals(type)) {
+ valString = buildJsonString(seaTunnelRow);
+ } else if (CSV.equals(type)) {
+ valString = buildCSVString(seaTunnelRow);
+ } else {
+ throw new IllegalArgumentException("The type " + type + " is not
supported!");
+ }
+ return valString.getBytes(StandardCharsets.UTF_8);
+ }
+
+ public String buildJsonString(SeaTunnelRow row) throws IOException {
+ Map<String, Object> rowMap = new HashMap<>(row.getFields().length);
+
+ for (int i = 0; i < row.getFields().length; i++) {
+ Object value = convert(seaTunnelRowType.getFieldType(i),
row.getField(i));
+ rowMap.put(seaTunnelRowType.getFieldName(i), value);
+ }
+ if (enableDelete) {
+ rowMap.put(LoadConstants.DORIS_DELETE_SIGN,
parseDeleteSign(row.getRowKind()));
+ }
+ return objectMapper.writeValueAsString(rowMap);
+ }
+
+ public String buildCSVString(SeaTunnelRow row) throws IOException {
+ StringJoiner joiner = new StringJoiner(fieldDelimiter);
+ for (int i = 0; i < row.getFields().length; i++) {
+ Object field = convert(seaTunnelRowType.getFieldType(i),
row.getField(i));
+ String value = field != null ? field.toString() : NULL_VALUE;
+ joiner.add(value);
+ }
+ if (enableDelete) {
+ joiner.add(parseDeleteSign(row.getRowKind()));
+ }
+ return joiner.toString();
+ }
+
+ public String parseDeleteSign(RowKind rowKind) {
+ if (RowKind.INSERT.equals(rowKind) ||
RowKind.UPDATE_AFTER.equals(rowKind)) {
+ return "0";
+ } else if (RowKind.DELETE.equals(rowKind) ||
RowKind.UPDATE_BEFORE.equals(rowKind)) {
+ return "1";
+ } else {
+ throw new IllegalArgumentException("Unrecognized row kind:" +
rowKind.toString());
+ }
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /** Builder for RowDataSerializer. */
+ public static class Builder {
+ private SeaTunnelRowType seaTunnelRowType;
+ private String type;
+ private String fieldDelimiter;
+ private boolean deletable;
+
+ public Builder setType(String type) {
+ this.type = type;
+ return this;
+ }
+
+ public Builder setSeaTunnelRowType(SeaTunnelRowType seaTunnelRowType) {
+ this.seaTunnelRowType = seaTunnelRowType;
+ return this;
+ }
+
+ public Builder setFieldDelimiter(String fieldDelimiter) {
+ this.fieldDelimiter = fieldDelimiter;
+ return this;
+ }
+
+ public Builder enableDelete(boolean deletable) {
+ this.deletable = deletable;
+ return this;
+ }
+
+ public SeaTunnelRowSerializer build() {
+ checkState(CSV.equals(type) && fieldDelimiter != null ||
JSON.equals(type));
+ return new SeaTunnelRowSerializer(type, seaTunnelRowType,
fieldDelimiter, deletable);
+ }
+ }
+
+ @Override
+ public void open() throws IOException {}
+
+ @Override
+ public void close() throws IOException {}
+}
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/serialize/SelectDBSerializer.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/serialize/SelectDBSerializer.java
similarity index 94%
rename from
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/serialize/SelectDBSerializer.java
rename to
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/serialize/SelectDBSerializer.java
index 84a2c7c6a..b1851198d 100644
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/serialize/SelectDBSerializer.java
+++
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/serialize/SelectDBSerializer.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connector.selectdb.serialize;
+package org.apache.seatunnel.connectors.selectdb.serialize;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/EscapeHandler.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/EscapeHandler.java
similarity index 71%
rename from
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/EscapeHandler.java
rename to
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/EscapeHandler.java
index e417ab65a..4e630df15 100644
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/EscapeHandler.java
+++
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/EscapeHandler.java
@@ -15,17 +15,14 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connector.selectdb.sink;
+package org.apache.seatunnel.connectors.selectdb.sink;
+
+import org.apache.seatunnel.connectors.selectdb.sink.writer.LoadConstants;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import static
org.apache.seatunnel.connector.selectdb.sink.writer.LoadConstants.FIELD_DELIMITER_DEFAULT;
-import static
org.apache.seatunnel.connector.selectdb.sink.writer.LoadConstants.FIELD_DELIMITER_KEY;
-import static
org.apache.seatunnel.connector.selectdb.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
-import static
org.apache.seatunnel.connector.selectdb.sink.writer.LoadConstants.LINE_DELIMITER_KEY;
-
/** Handler for escape in properties. */
public class EscapeHandler {
public static final String ESCAPE_DELIMITERS_FLAGS = "\\x";
@@ -48,13 +45,16 @@ public class EscapeHandler {
public void handle(Properties properties) {
String fieldDelimiter =
- properties.getProperty(FIELD_DELIMITER_KEY,
FIELD_DELIMITER_DEFAULT);
+ properties.getProperty(
+ LoadConstants.FIELD_DELIMITER_KEY,
LoadConstants.FIELD_DELIMITER_DEFAULT);
if (fieldDelimiter.contains(ESCAPE_DELIMITERS_FLAGS)) {
- properties.setProperty(FIELD_DELIMITER_KEY,
escapeString(fieldDelimiter));
+ properties.setProperty(LoadConstants.FIELD_DELIMITER_KEY,
escapeString(fieldDelimiter));
}
- String lineDelimiter = properties.getProperty(LINE_DELIMITER_KEY,
LINE_DELIMITER_DEFAULT);
+ String lineDelimiter =
+ properties.getProperty(
+ LoadConstants.LINE_DELIMITER_KEY,
LoadConstants.LINE_DELIMITER_DEFAULT);
if (lineDelimiter.contains(ESCAPE_DELIMITERS_FLAGS)) {
- properties.setProperty(LINE_DELIMITER_KEY,
escapeString(lineDelimiter));
+ properties.setProperty(LoadConstants.LINE_DELIMITER_KEY,
escapeString(lineDelimiter));
}
}
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/SelectDBSink.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/SelectDBSink.java
similarity index 81%
rename from
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/SelectDBSink.java
rename to
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/SelectDBSink.java
index 4fdc70ab7..e8789fb2e 100644
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/SelectDBSink.java
+++
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/SelectDBSink.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connector.selectdb.sink;
+package org.apache.seatunnel.connectors.selectdb.sink;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -32,13 +32,13 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
-import
org.apache.seatunnel.connector.selectdb.exception.SelectDBConnectorException;
-import
org.apache.seatunnel.connector.selectdb.sink.committer.SelectDBCommitInfo;
-import
org.apache.seatunnel.connector.selectdb.sink.committer.SelectDBCommitInfoSerializer;
-import
org.apache.seatunnel.connector.selectdb.sink.committer.SelectDBCommitter;
-import org.apache.seatunnel.connector.selectdb.sink.writer.SelectDBSinkState;
-import
org.apache.seatunnel.connector.selectdb.sink.writer.SelectDBSinkStateSerializer;
-import org.apache.seatunnel.connector.selectdb.sink.writer.SelectDBSinkWriter;
+import
org.apache.seatunnel.connectors.selectdb.exception.SelectDBConnectorException;
+import
org.apache.seatunnel.connectors.selectdb.sink.committer.SelectDBCommitInfo;
+import
org.apache.seatunnel.connectors.selectdb.sink.committer.SelectDBCommitInfoSerializer;
+import
org.apache.seatunnel.connectors.selectdb.sink.committer.SelectDBCommitter;
+import org.apache.seatunnel.connectors.selectdb.sink.writer.SelectDBSinkState;
+import
org.apache.seatunnel.connectors.selectdb.sink.writer.SelectDBSinkStateSerializer;
+import org.apache.seatunnel.connectors.selectdb.sink.writer.SelectDBSinkWriter;
import com.google.auto.service.AutoService;
@@ -47,11 +47,11 @@ import java.util.Collections;
import java.util.List;
import java.util.Optional;
-import static
org.apache.seatunnel.connector.selectdb.config.SelectDBConfig.CLUSTER_NAME;
-import static
org.apache.seatunnel.connector.selectdb.config.SelectDBConfig.JDBC_URL;
-import static
org.apache.seatunnel.connector.selectdb.config.SelectDBConfig.LOAD_URL;
-import static
org.apache.seatunnel.connector.selectdb.config.SelectDBConfig.TABLE_IDENTIFIER;
-import static
org.apache.seatunnel.connector.selectdb.config.SelectDBConfig.USERNAME;
+import static
org.apache.seatunnel.connectors.selectdb.config.SelectDBConfig.CLUSTER_NAME;
+import static
org.apache.seatunnel.connectors.selectdb.config.SelectDBConfig.JDBC_URL;
+import static
org.apache.seatunnel.connectors.selectdb.config.SelectDBConfig.LOAD_URL;
+import static
org.apache.seatunnel.connectors.selectdb.config.SelectDBConfig.TABLE_IDENTIFIER;
+import static
org.apache.seatunnel.connectors.selectdb.config.SelectDBConfig.USERNAME;
@AutoService(SeaTunnelSink.class)
public class SelectDBSink
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/committer/SelectDBCommitInfo.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/committer/SelectDBCommitInfo.java
similarity index 95%
rename from
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/committer/SelectDBCommitInfo.java
rename to
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/committer/SelectDBCommitInfo.java
index 23e1b3a2f..8227f3efd 100644
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/committer/SelectDBCommitInfo.java
+++
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/committer/SelectDBCommitInfo.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connector.selectdb.sink.committer;
+package org.apache.seatunnel.connectors.selectdb.sink.committer;
import lombok.EqualsAndHashCode;
import lombok.Getter;
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/committer/SelectDBCommitInfoSerializer.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/committer/SelectDBCommitInfoSerializer.java
similarity index 97%
rename from
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/committer/SelectDBCommitInfoSerializer.java
rename to
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/committer/SelectDBCommitInfoSerializer.java
index 3d78bb560..d67fdde17 100644
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/committer/SelectDBCommitInfoSerializer.java
+++
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/committer/SelectDBCommitInfoSerializer.java
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.seatunnel.connector.selectdb.sink.committer;
+package org.apache.seatunnel.connectors.selectdb.sink.committer;
import org.apache.seatunnel.api.serialization.Serializer;
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/committer/SelectDBCommitter.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/committer/SelectDBCommitter.java
similarity index 87%
rename from
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/committer/SelectDBCommitter.java
rename to
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/committer/SelectDBCommitter.java
index e0d204860..7c210be84 100644
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/committer/SelectDBCommitter.java
+++
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/committer/SelectDBCommitter.java
@@ -15,19 +15,20 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connector.selectdb.sink.committer;
+package org.apache.seatunnel.connectors.selectdb.sink.committer;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.sink.SinkCommitter;
-import org.apache.seatunnel.connector.selectdb.config.SelectDBConfig;
-import
org.apache.seatunnel.connector.selectdb.exception.SelectDBConnectorErrorCode;
-import
org.apache.seatunnel.connector.selectdb.exception.SelectDBConnectorException;
-import org.apache.seatunnel.connector.selectdb.rest.BaseResponse;
-import org.apache.seatunnel.connector.selectdb.rest.CopyIntoResp;
-import org.apache.seatunnel.connector.selectdb.util.HttpPostBuilder;
-import org.apache.seatunnel.connector.selectdb.util.HttpUtil;
-import org.apache.seatunnel.connector.selectdb.util.ResponseUtil;
+import org.apache.seatunnel.connectors.selectdb.config.SelectDBConfig;
+import
org.apache.seatunnel.connectors.selectdb.exception.SelectDBConnectorErrorCode;
+import
org.apache.seatunnel.connectors.selectdb.exception.SelectDBConnectorException;
+import org.apache.seatunnel.connectors.selectdb.rest.BaseResponse;
+import org.apache.seatunnel.connectors.selectdb.rest.CopyIntoResp;
+import org.apache.seatunnel.connectors.selectdb.sink.writer.LoadStatus;
+import org.apache.seatunnel.connectors.selectdb.util.HttpPostBuilder;
+import org.apache.seatunnel.connectors.selectdb.util.HttpUtil;
+import org.apache.seatunnel.connectors.selectdb.util.ResponseUtil;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.entity.StringEntity;
@@ -44,9 +45,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static
org.apache.seatunnel.connector.selectdb.sink.writer.LoadStatus.FAIL;
-import static
org.apache.seatunnel.connector.selectdb.sink.writer.LoadStatus.SUCCESS;
-
@Slf4j
public class SelectDBCommitter implements SinkCommitter<SelectDBCommitInfo> {
private static final String COMMIT_PATTERN = "http://%s/copy/query";
@@ -151,9 +149,9 @@ public class SelectDBCommitter implements
SinkCommitter<SelectDBCommitInfo> {
BaseResponse<CopyIntoResp> baseResponse =
objectMapper.readValue(
loadResult, new
TypeReference<BaseResponse<CopyIntoResp>>() {});
- if (baseResponse.getCode() == SUCCESS) {
+ if (baseResponse.getCode() == LoadStatus.SUCCESS) {
CopyIntoResp dataResp = baseResponse.getData();
- if (FAIL.equals(dataResp.getDataCode())) {
+ if (LoadStatus.FAIL.equals(dataResp.getDataCode())) {
log.error("copy into execute failed, reason:{}", loadResult);
return false;
} else {
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/writer/CopySQLBuilder.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/CopySQLBuilder.java
similarity index 92%
rename from
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/writer/CopySQLBuilder.java
rename to
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/CopySQLBuilder.java
index 5f4f4ccc9..e521372fe 100644
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/writer/CopySQLBuilder.java
+++
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/CopySQLBuilder.java
@@ -15,9 +15,9 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connector.selectdb.sink.writer;
+package org.apache.seatunnel.connectors.selectdb.sink.writer;
-import org.apache.seatunnel.connector.selectdb.config.SelectDBConfig;
+import org.apache.seatunnel.connectors.selectdb.config.SelectDBConfig;
import java.util.List;
import java.util.Map;
@@ -34,7 +34,7 @@ public class CopySQLBuilder {
public CopySQLBuilder(SelectDBConfig selectdbConfig, List<String>
fileList) {
this.selectdbConfig = selectdbConfig;
this.fileList = fileList;
- this.properties = selectdbConfig.getStreamLoadProps();
+ this.properties = selectdbConfig.getStageLoadProps();
}
public String buildCopySQL() {
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/writer/LabelGenerator.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/LabelGenerator.java
similarity index 71%
rename from
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/writer/LabelGenerator.java
rename to
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/LabelGenerator.java
index 27e150e72..3b9af48b7 100644
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/writer/LabelGenerator.java
+++
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/LabelGenerator.java
@@ -15,21 +15,17 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connector.selectdb.sink.writer;
+package org.apache.seatunnel.connectors.selectdb.sink.writer;
/** Generator label for stream load. */
public class LabelGenerator {
- private final String labelPrefix;
- private final boolean enable2PC;
+ private String labelPrefix;
- public LabelGenerator(String labelPrefix, boolean enable2PC) {
+ public LabelGenerator(String labelPrefix) {
this.labelPrefix = labelPrefix;
- this.enable2PC = enable2PC;
}
public String generateLabel(long chkId, int fileNum) {
- return enable2PC
- ? labelPrefix + "_" + chkId + "_" + fileNum
- : labelPrefix + "_" + System.currentTimeMillis();
+ return labelPrefix + "_" + chkId + "_" + fileNum;
}
}
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/writer/LoadConstants.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/LoadConstants.java
similarity index 95%
rename from
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/writer/LoadConstants.java
rename to
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/LoadConstants.java
index eac655ed5..41558f6bf 100644
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/writer/LoadConstants.java
+++
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/LoadConstants.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connector.selectdb.sink.writer;
+package org.apache.seatunnel.connectors.selectdb.sink.writer;
/** Constants for load. */
public class LoadConstants {
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/writer/LoadStatus.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/LoadStatus.java
similarity index 93%
rename from
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/writer/LoadStatus.java
rename to
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/LoadStatus.java
index e2b56b49c..6ed428a1f 100644
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/writer/LoadStatus.java
+++
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/LoadStatus.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connector.selectdb.sink.writer;
+package org.apache.seatunnel.connectors.selectdb.sink.writer;
/** enum of LoadStatus. */
public class LoadStatus {
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/RecordBuffer.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/RecordBuffer.java
new file mode 100644
index 000000000..d0dc446c9
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/RecordBuffer.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.selectdb.sink.writer;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.nio.charset.StandardCharsets;
+import java.util.StringJoiner;
+
+@Slf4j
+public class RecordBuffer {
+ private String fileName;
+ private StringJoiner buffer;
+ private String lineDelimiter;
+ private int numOfRecords = 0;
+ private long bufferSizeBytes = 0;
+
+ public RecordBuffer() {}
+
+ public RecordBuffer(String lineDelimiter) {
+ super();
+ this.lineDelimiter = lineDelimiter;
+ this.buffer = new StringJoiner(lineDelimiter);
+ }
+
+ public void insert(String record) {
+ this.buffer.add(record);
+ setNumOfRecords(getNumOfRecords() + 1);
+ setBufferSizeBytes(getBufferSizeBytes() +
record.getBytes(StandardCharsets.UTF_8).length);
+ }
+
+ public String getFileName() {
+ return fileName;
+ }
+
+ public void setFileName(String fileName) {
+ this.fileName = fileName;
+ }
+
+ public boolean isEmpty() {
+ return numOfRecords == 0;
+ }
+
+ public String getData() {
+ String result = buffer.toString();
+ log.debug("flush buffer: {} records, {} bytes", getNumOfRecords(),
getBufferSizeBytes());
+ return result;
+ }
+
+ public int getNumOfRecords() {
+ return numOfRecords;
+ }
+
+ public long getBufferSizeBytes() {
+ return bufferSizeBytes;
+ }
+
+ public void setNumOfRecords(int numOfRecords) {
+ this.numOfRecords = numOfRecords;
+ }
+
+ public void setBufferSizeBytes(long bufferSizeBytes) {
+ this.bufferSizeBytes = bufferSizeBytes;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/writer/SelectDBSinkState.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBSinkState.java
similarity index 84%
rename from
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/writer/SelectDBSinkState.java
rename to
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBSinkState.java
index f2e924f31..a8b1cd878 100644
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/writer/SelectDBSinkState.java
+++
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBSinkState.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connector.selectdb.sink.writer;
+package org.apache.seatunnel.connectors.selectdb.sink.writer;
import lombok.EqualsAndHashCode;
import lombok.Getter;
@@ -31,7 +31,10 @@ import java.io.Serializable;
public class SelectDBSinkState implements Serializable {
String labelPrefix;
- public SelectDBSinkState(String labelPrefix) {
+ long checkpointId;
+
+ public SelectDBSinkState(String labelPrefix, long checkpointId) {
this.labelPrefix = labelPrefix;
+ this.checkpointId = checkpointId;
}
}
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/writer/SelectDBSinkStateSerializer.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBSinkStateSerializer.java
similarity index 90%
rename from
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/writer/SelectDBSinkStateSerializer.java
rename to
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBSinkStateSerializer.java
index d68043f42..1546d3b4d 100644
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/writer/SelectDBSinkStateSerializer.java
+++
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBSinkStateSerializer.java
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.seatunnel.connector.selectdb.sink.writer;
+package org.apache.seatunnel.connectors.selectdb.sink.writer;
import org.apache.seatunnel.api.serialization.Serializer;
@@ -43,7 +43,8 @@ public class SelectDBSinkStateSerializer implements
Serializer<SelectDBSinkState
try (final ByteArrayInputStream bais = new
ByteArrayInputStream(serialized);
final DataInputStream in = new DataInputStream(bais)) {
final String labelPrefix = in.readUTF();
- return new SelectDBSinkState(labelPrefix);
+ final long checkpointId = in.readLong();
+ return new SelectDBSinkState(labelPrefix, checkpointId);
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBSinkWriter.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBSinkWriter.java
new file mode 100644
index 000000000..c284694c5
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBSinkWriter.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.selectdb.sink.writer;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.selectdb.config.SelectDBConfig;
+import
org.apache.seatunnel.connectors.selectdb.serialize.SeaTunnelRowSerializer;
+import org.apache.seatunnel.connectors.selectdb.serialize.SelectDBSerializer;
+import
org.apache.seatunnel.connectors.selectdb.sink.committer.SelectDBCommitInfo;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import static com.google.common.base.Preconditions.checkState;
+
+@Slf4j
+public class SelectDBSinkWriter
+ implements SinkWriter<SeaTunnelRow, SelectDBCommitInfo,
SelectDBSinkState> {
+ private final SelectDBConfig selectdbConfig;
+ private final long lastCheckpointId;
+ private SelectDBStageLoad selectDBStageLoad;
+ volatile boolean loading;
+ private final String labelPrefix;
+ private final byte[] lineDelimiter;
+ private final LabelGenerator labelGenerator;
+ private final SelectDBSinkState selectdbSinkState;
+ private final SelectDBSerializer serializer;
+
+ public SelectDBSinkWriter(
+ SinkWriter.Context context,
+ List<SelectDBSinkState> state,
+ SeaTunnelRowType seaTunnelRowType,
+ Config pluginConfig) {
+ this.selectdbConfig = SelectDBConfig.loadConfig(pluginConfig);
+ this.lastCheckpointId = state.size() != 0 ?
state.get(0).getCheckpointId() : 0;
+ log.info("restore checkpointId {}", lastCheckpointId);
+ // filename prefix is uuid
+ log.info("labelPrefix " + selectdbConfig.getLabelPrefix());
+ this.selectdbSinkState =
+ new SelectDBSinkState(selectdbConfig.getLabelPrefix(),
lastCheckpointId);
+ this.labelPrefix = selectdbConfig.getLabelPrefix() + "_" +
context.getIndexOfSubtask();
+ this.lineDelimiter =
+ selectdbConfig
+ .getStageLoadProps()
+ .getProperty(
+ LoadConstants.LINE_DELIMITER_KEY,
+ LoadConstants.LINE_DELIMITER_DEFAULT)
+ .getBytes();
+ this.labelGenerator = new LabelGenerator(labelPrefix);
+ this.serializer = createSerializer(selectdbConfig, seaTunnelRowType);
+ this.loading = false;
+ }
+
+ public void initializeLoad(List<SelectDBSinkState> state) throws
IOException {
+ this.selectDBStageLoad = new SelectDBStageLoad(selectdbConfig,
labelGenerator);
+ this.selectDBStageLoad.setCurrentCheckpointID(lastCheckpointId + 1);
+ serializer.open();
+ }
+
+ @Override
+ public synchronized void write(SeaTunnelRow element) throws IOException {
+ byte[] serialize = serializer.serialize(element);
+ if (Objects.isNull(serialize)) {
+ // schema change is null
+ return;
+ }
+ try {
+ this.selectDBStageLoad.writeRecord(serialize);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public synchronized Optional<SelectDBCommitInfo> prepareCommit() throws
IOException {
+ checkState(selectDBStageLoad != null);
+ log.info("checkpoint arrived, upload buffer to storage");
+ try {
+ this.selectDBStageLoad.flush(true);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ CopySQLBuilder copySQLBuilder =
+ new CopySQLBuilder(selectdbConfig,
selectDBStageLoad.getFileList());
+ String copySql = copySQLBuilder.buildCopySQL();
+ return Optional.of(
+ new SelectDBCommitInfo(
+ selectDBStageLoad.getHostPort(),
selectdbConfig.getClusterName(), copySql));
+ }
+
+ @Override
+ public synchronized List<SelectDBSinkState> snapshotState(long
checkpointId)
+ throws IOException {
+ checkState(selectDBStageLoad != null);
+ log.info("clear the file list {}", selectDBStageLoad.getFileList());
+ this.selectDBStageLoad.clearFileList();
+ this.selectDBStageLoad.setCurrentCheckpointID(checkpointId + 1);
+ return Collections.singletonList(selectdbSinkState);
+ }
+
+ @Override
+ public void abortPrepare() {}
+
+ @Override
+ public void close() throws IOException {
+ if (selectDBStageLoad != null) {
+ selectDBStageLoad.close();
+ }
+ serializer.close();
+ }
+
+ public static SelectDBSerializer createSerializer(
+ SelectDBConfig selectdbConfig, SeaTunnelRowType seaTunnelRowType) {
+ return new SeaTunnelRowSerializer(
+ selectdbConfig
+ .getStageLoadProps()
+ .getProperty(LoadConstants.FORMAT_KEY)
+ .toLowerCase(),
+ seaTunnelRowType,
+
selectdbConfig.getStageLoadProps().getProperty(LoadConstants.FIELD_DELIMITER_KEY),
+ selectdbConfig.getEnableDelete());
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBStageLoad.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBStageLoad.java
new file mode 100644
index 000000000..44aaf8977
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBStageLoad.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.selectdb.sink.writer;
+
+import org.apache.seatunnel.connectors.selectdb.config.SelectDBConfig;
+import
org.apache.seatunnel.connectors.selectdb.exception.SelectDBConnectorErrorCode;
+import
org.apache.seatunnel.connectors.selectdb.exception.SelectDBConnectorException;
+import org.apache.seatunnel.connectors.selectdb.rest.BaseResponse;
+import org.apache.seatunnel.connectors.selectdb.util.HttpPutBuilder;
+
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static
org.apache.seatunnel.connectors.selectdb.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
+import static
org.apache.seatunnel.connectors.selectdb.sink.writer.LoadConstants.LINE_DELIMITER_KEY;
+
+@Slf4j
+public class SelectDBStageLoad implements Serializable {
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private final LabelGenerator labelGenerator;
+ private final String lineDelimiter;
+ private static final String UPLOAD_URL_PATTERN = "http://%s/copy/upload";
+
+ private final SelectDBConfig selectdbConfig;
+ private String uploadUrl;
+ private String hostPort;
+ private final String username;
+ private final String password;
+ private final String db;
+ private final String table;
+ private final Properties stageLoadProps;
+ private List<String> fileList = new CopyOnWriteArrayList();
+ private RecordBuffer buffer;
+ private long currentCheckpointID;
+ private AtomicInteger fileNum;
+ private ExecutorService loadExecutorService;
+ private StageLoadAsyncExecutor loadAsyncExecutor;
+ private ArrayBlockingQueue<RecordBuffer> queue;
+ private final AtomicBoolean started;
+ private AtomicReference<Throwable> exception = new AtomicReference<>(null);
+ private HttpClientBuilder httpClientBuilder =
HttpClients.custom().disableRedirectHandling();
+
+ public SelectDBStageLoad(SelectDBConfig selectdbConfig, LabelGenerator
labelGenerator) {
+ this.selectdbConfig = selectdbConfig;
+ this.hostPort = selectdbConfig.getLoadUrl();
+ String[] tableInfo = selectdbConfig.getTableIdentifier().split("\\.");
+ this.db = tableInfo[0];
+ this.table = tableInfo[1];
+ this.username = selectdbConfig.getUsername();
+ this.password = selectdbConfig.getPassword();
+ this.labelGenerator = labelGenerator;
+ this.uploadUrl = String.format(UPLOAD_URL_PATTERN, hostPort);
+ this.stageLoadProps = selectdbConfig.getStageLoadProps();
+ this.lineDelimiter = stageLoadProps.getProperty(LINE_DELIMITER_KEY,
LINE_DELIMITER_DEFAULT);
+ this.fileNum = new AtomicInteger();
+ this.buffer = new RecordBuffer(lineDelimiter);
+ this.queue = new
ArrayBlockingQueue<>(selectdbConfig.getFlushQueueSize());
+ this.loadAsyncExecutor = new StageLoadAsyncExecutor();
+ this.loadExecutorService =
+ new ThreadPoolExecutor(
+ 1,
+ 1,
+ 0L,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(1),
+ new DefaultThreadFactory("upload-executor"),
+ new ThreadPoolExecutor.AbortPolicy());
+ this.started = new AtomicBoolean(true);
+ this.loadExecutorService.execute(loadAsyncExecutor);
+ }
+
+ public String getHostPort() {
+ return hostPort;
+ }
+
+ public List<String> getFileList() {
+ return fileList;
+ }
+
+ public void clearFileList() {
+ this.fileNum.set(0);
+ fileList.clear();
+ }
+
+ /**
+ * write record into cache.
+ *
+ * @param record
+ * @throws IOException
+ */
+ public void writeRecord(byte[] record) throws InterruptedException {
+ buffer.insert(new String(record, StandardCharsets.UTF_8));
+ if (buffer.getBufferSizeBytes() >= selectdbConfig.getBufferSize()
+ || (selectdbConfig.getBufferCount() != 0
+ && buffer.getNumOfRecords() >=
selectdbConfig.getBufferCount())) {
+ flush(false);
+ }
+ }
+
+ public void flush(boolean waitUtilDone) throws InterruptedException {
+ checkFlushException();
+ if (buffer == null) {
+ return;
+ }
+ String fileName =
+ labelGenerator.generateLabel(currentCheckpointID,
fileNum.getAndIncrement());
+ buffer.setFileName(fileName);
+ RecordBuffer tmpBuff = buffer;
+ log.info("flush buffer to queue, actual queue size {}", queue.size());
+ offer(tmpBuff);
+ if (waitUtilDone) {
+ waitAsyncLoadFinish();
+ }
+ this.buffer = new RecordBuffer(this.lineDelimiter);
+ }
+
+ private void offer(RecordBuffer buffer) throws InterruptedException {
+ checkFlushException();
+ if (!queue.offer(buffer, 600 * 1000, TimeUnit.MILLISECONDS)) {
+ throw new SelectDBConnectorException(
+ SelectDBConnectorErrorCode.STAGE_LOAD_FAILED,
+ "offer data to queue timeout, exceed ");
+ }
+ }
+
+ private void checkFlushException() {
+ if (exception.get() != null) {
+ throw new SelectDBConnectorException(
+ SelectDBConnectorErrorCode.STAGE_LOAD_FAILED,
exception.get());
+ }
+ }
+
+ private void waitAsyncLoadFinish() throws InterruptedException {
+ for (int i = 0; i < selectdbConfig.getFlushQueueSize() + 1; i++) {
+ offer(new RecordBuffer());
+ }
+ }
+
+ public void close() {
+ this.loadExecutorService.shutdown();
+ }
+
+ public void setCurrentCheckpointID(long currentCheckpointID) {
+ this.currentCheckpointID = currentCheckpointID;
+ }
+
+ @VisibleForTesting
+ public void setHttpClientBuilder(HttpClientBuilder httpClientBuilder) {
+ this.httpClientBuilder = httpClientBuilder;
+ }
+
+ class StageLoadAsyncExecutor implements Runnable {
+ @Override
+ public void run() {
+ log.info("StageLoadAsyncExecutor start");
+ while (started.get()) {
+ try {
+ RecordBuffer buffer = queue.poll(2000L,
TimeUnit.MILLISECONDS);
+ if (buffer != null && buffer.getFileName() != null) {
+ uploadToStorage(buffer.getFileName(), buffer);
+ fileList.add(buffer.getFileName());
+ }
+ } catch (Exception e) {
+ log.error("worker running error", e);
+ exception.set(e);
+ break;
+ }
+ }
+ log.info("StageLoadAsyncExecutor stop");
+ }
+
+ /** upload to storage */
+ public void uploadToStorage(String fileName, RecordBuffer buffer) {
+ long start = System.currentTimeMillis();
+ log.info("file write started for {}", fileName);
+ String address = getUploadAddress(fileName);
+ log.info("redirect to internalStage address:{}", address);
+ uploadToInternalStage(address,
buffer.getData().getBytes(StandardCharsets.UTF_8));
+ log.info(
+ "upload file {} finished, record {} size {}, cost {}ms ",
+ fileName,
+ buffer.getNumOfRecords(),
+ buffer.getBufferSizeBytes(),
+ System.currentTimeMillis() - start);
+ }
+
+ public BaseResponse uploadToInternalStage(String address, byte[] data)
+ throws SelectDBConnectorException {
+ ByteArrayEntity entity = new ByteArrayEntity(data);
+ HttpPutBuilder putBuilder = new HttpPutBuilder();
+ putBuilder.setUrl(address).addCommonHeader().setEntity(entity);
+ HttpPut httpPut = putBuilder.build();
+ try {
+ try (CloseableHttpResponse response =
httpClientBuilder.build().execute(httpPut)) {
+ final int statusCode =
response.getStatusLine().getStatusCode();
+ if (statusCode == 200 && response.getEntity() != null) {
+ String loadResult =
EntityUtils.toString(response.getEntity());
+ if (loadResult == null || loadResult.isEmpty()) {
+ // upload finished
+ return null;
+ }
+ throw new SelectDBConnectorException(
+ SelectDBConnectorErrorCode.STAGE_LOAD_FAILED,
+ "upload file failed: " +
response.getStatusLine().toString());
+ }
+ throw new SelectDBConnectorException(
+ SelectDBConnectorErrorCode.STAGE_LOAD_FAILED,
+ "upload file error: " +
response.getStatusLine().toString());
+ }
+ } catch (IOException ex) {
+ throw new SelectDBConnectorException(
+ SelectDBConnectorErrorCode.STAGE_LOAD_FAILED,
+ "Failed to upload data to internal stage",
+ ex);
+ }
+ }
+
+ /** Get the redirected s3 address */
+ public String getUploadAddress(String fileName) throws
SelectDBConnectorException {
+ HttpPutBuilder putBuilder = new HttpPutBuilder();
+ putBuilder
+ .setUrl(uploadUrl)
+ .addFileName(fileName)
+ .addCommonHeader()
+ .setEmptyEntity()
+ .baseAuth(username, password);
+ try {
+ try (CloseableHttpResponse execute =
+ httpClientBuilder.build().execute(putBuilder.build()))
{
+ int statusCode = execute.getStatusLine().getStatusCode();
+ String reason = execute.getStatusLine().getReasonPhrase();
+ if (statusCode == 307) {
+ Header location = execute.getFirstHeader("location");
+ String uploadAddress = location.getValue();
+ return uploadAddress;
+ } else {
+ HttpEntity entity = execute.getEntity();
+ String result = entity == null ? null :
EntityUtils.toString(entity);
+ String errMsg =
+ String.format(
+ "Failed to get internalStage address,
status {}, reason {}, response {}",
+ statusCode,
+ reason,
+ result);
+ throw new SelectDBConnectorException(
+ SelectDBConnectorErrorCode.STAGE_LOAD_FAILED,
errMsg);
+ }
+ }
+ } catch (IOException e) {
+ throw new SelectDBConnectorException(
+ SelectDBConnectorErrorCode.STAGE_LOAD_FAILED,
+ "get internalStage address error",
+ e);
+ }
+ }
+ }
+
+ static class DefaultThreadFactory implements ThreadFactory {
+ private static final AtomicInteger poolNumber = new AtomicInteger(1);
+ private final AtomicInteger threadNumber = new AtomicInteger(1);
+ private final String namePrefix;
+
+ DefaultThreadFactory(String name) {
+ namePrefix = "pool-" + poolNumber.getAndIncrement() + "-" + name +
"-";
+ }
+
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r, namePrefix +
threadNumber.getAndIncrement());
+ t.setDaemon(false);
+ return t;
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/util/HttpPostBuilder.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/util/HttpPostBuilder.java
similarity index 97%
rename from
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/util/HttpPostBuilder.java
rename to
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/util/HttpPostBuilder.java
index d82798d51..c53c67291 100644
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/util/HttpPostBuilder.java
+++
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/util/HttpPostBuilder.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connector.selectdb.util;
+package org.apache.seatunnel.connectors.selectdb.util;
import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpEntity;
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/util/HttpPutBuilder.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/util/HttpPutBuilder.java
similarity index 97%
rename from
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/util/HttpPutBuilder.java
rename to
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/util/HttpPutBuilder.java
index 98e80949a..01a574f6a 100644
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/util/HttpPutBuilder.java
+++
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/util/HttpPutBuilder.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connector.selectdb.util;
+package org.apache.seatunnel.connectors.selectdb.util;
import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpEntity;
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/util/HttpUtil.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/util/HttpUtil.java
similarity index 95%
rename from
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/util/HttpUtil.java
rename to
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/util/HttpUtil.java
index a3a5fdf07..9ccf55451 100644
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/util/HttpUtil.java
+++
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/util/HttpUtil.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connector.selectdb.util;
+package org.apache.seatunnel.connectors.selectdb.util;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/util/ResponseUtil.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/util/ResponseUtil.java
similarity index 96%
rename from
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/util/ResponseUtil.java
rename to
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/util/ResponseUtil.java
index 4111ecda4..d2a161f64 100644
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/util/ResponseUtil.java
+++
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/util/ResponseUtil.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connector.selectdb.util;
+package org.apache.seatunnel.connectors.selectdb.util;
import java.util.regex.Pattern;