This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b2375fffe8 [Feature] Support SaveMode on Doris (#6085)
b2375fffe8 is described below
commit b2375fffe803083ea89c201e38c8396e2def7cbe
Author: Jia Fan <[email protected]>
AuthorDate: Thu Jan 4 14:52:10 2024 +0800
[Feature] Support SaveMode on Doris (#6085)
---
docs/en/connector-v2/sink/Doris.md | 112 +++++--
.../seatunnel/api/sink/DefaultSaveModeHandler.java | 3 +
.../apache/seatunnel/api/table/catalog/Column.java | 3 +
.../api/table/catalog/MetadataColumn.java | 12 +
.../api/table/catalog/PhysicalColumn.java | 17 +
.../connectors/doris/catalog/DorisCatalog.java | 25 ++
.../doris/catalog/DorisCatalogFactory.java | 4 +
.../connectors/doris/config/DorisConfig.java | 6 +-
.../connectors/doris/config/DorisOptions.java | 94 ++++--
.../connectors/doris/rest/PartitionDefinition.java | 147 --------
.../connectors/doris/rest/RestService.java | 373 +--------------------
.../connectors/doris/rest/models/Backend.java | 40 ---
.../connectors/doris/rest/models/BackendRow.java | 41 ---
.../connectors/doris/rest/models/Field.java | 134 --------
.../connectors/doris/rest/models/QueryPlan.java | 70 ----
.../connectors/doris/rest/models/Schema.java | 108 ------
.../connectors/doris/rest/models/Tablet.java | 80 -----
.../seatunnel/connectors/doris/sink/DorisSink.java | 47 ++-
.../connectors/doris/sink/DorisSinkFactory.java | 59 +++-
.../doris/sink/writer/DorisSinkState.java | 4 +-
.../doris/sink/writer/DorisSinkWriter.java | 33 +-
.../doris/sink/writer/DorisStreamLoad.java | 7 +-
.../connectors/doris/util/DorisCatalogUtil.java | 80 +++--
.../seatunnel/connectors/doris/util/IOUtils.java | 49 ---
.../doris/util/UnsupportedTypeConverterUtils.java | 101 ++++++
.../doris/catalog/DorisCreateTableTest.java | 280 ++++++++++++++++
.../e2e/connector/doris/DorisCatalogIT.java | 152 +++++++--
.../resources/write-cdc-changelog-to-doris.conf | 3 +-
.../src/test/resources/doris-jdbc-to-doris.conf | 3 +-
29 files changed, 935 insertions(+), 1152 deletions(-)
diff --git a/docs/en/connector-v2/sink/Doris.md
b/docs/en/connector-v2/sink/Doris.md
index de0c47453a..a485eaf8c7 100644
--- a/docs/en/connector-v2/sink/Doris.md
+++ b/docs/en/connector-v2/sink/Doris.md
@@ -32,21 +32,91 @@ Version Supported
## Sink Options
-| Name | Type | Required | Default |
Description
|
-|---------------------|--------|----------|------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| fenodes | String | Yes | - | `Doris` cluster
fenodes address, the format is `"fe_ip:fe_http_port, ..."`
|
-| username | String | Yes | - | `Doris` user username
|
-| password | String | Yes | - | `Doris` user password
|
-| table.identifier | String | Yes | - | The name of `Doris`
table
|
-| sink.label-prefix | String | Yes | - | The label prefix used
by stream load imports. In the 2pc scenario, global uniqueness is required to
ensure the EOS semantics of SeaTunnel.
|
-| sink.enable-2pc | bool | No | - | Whether to enable
two-phase commit (2pc), the default is true, to ensure Exactly-Once semantics.
For two-phase commit, please refer to
[here](https://doris.apache.org/docs/dev/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD).
|
-| sink.enable-delete | bool | No | - | Whether to enable
deletion. This option requires Doris table to enable batch delete function
(0.15+ version is enabled by default), and only supports Unique model. you can
get more detail at this
[link](https://doris.apache.org/docs/dev/data-operate/update-delete/batch-delete-manual)
|
-| sink.check-interval | int | No | 10000 | check exception with
the interval while loading
|
-| sink.max-retries | int | No | 3 | the max retry times
if writing records to database failed
|
-| sink.buffer-size | int | No | 256 * 1024 | the buffer size to
cache data for stream load.
|
-| sink.buffer-count | int | No | 3 | the buffer count to
cache data for stream load.
|
-| doris.batch.size | int | No | 1024 | the batch size of the
write to doris each http request, when the row reaches the size or checkpoint
is executed, the data of cached will write to server.
|
-| doris.config | map | yes | - | This option is used
to support operations such as `insert`, `delete`, and `update` when
automatically generate sql,and supported formats.
|
+| Name | Type | Required | Default
|
Description
|
+|--------------------------------|---------|----------|------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| fenodes | String | Yes | -
| `Doris` cluster fenodes address, the format is `"fe_ip:fe_http_port,
..."`
|
+| query-port | int | No | 9030
| `Doris` Fenodes query_port
|
+| username | String | Yes | -
| `Doris` user username
|
+| password | String | Yes | -
| `Doris` user password
|
+| database | String | Yes | -
| The database name of `Doris` table, use `${database_name}` to represent
the upstream table name
|
+| table | String | Yes | -
| The table name of `Doris` table, use `${table_name}` to represent the
upstream table name
|
+| table.identifier | String | Yes | -
| The name of `Doris` table, it will deprecate after version 2.3.5,
please use `database` and `table` instead.
|
+| sink.label-prefix | String | Yes | -
| The label prefix used by stream load imports. In the 2pc scenario,
global uniqueness is required to ensure the EOS semantics of SeaTunnel.
|
+| sink.enable-2pc | bool | No | -
| Whether to enable two-phase commit (2pc), the default is true, to
ensure Exactly-Once semantics. For two-phase commit, please refer to
[here](https://doris.apache.org/docs/dev/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD).
|
+| sink.enable-delete | bool | No | -
| Whether to enable deletion. This option requires Doris table to enable
batch delete function (0.15+ version is enabled by default), and only supports
Unique model. you can get more detail at this
[link](https://doris.apache.org/docs/dev/data-operate/update-delete/batch-delete-manual)
|
+| sink.check-interval | int | No | 10000
| check exception with the interval while loading
|
+| sink.max-retries | int | No | 3
| the max retry times if writing records to database failed
|
+| sink.buffer-size | int | No | 256 * 1024
| the buffer size to cache data for stream load.
|
+| sink.buffer-count | int | No | 3
| the buffer count to cache data for stream load.
|
+| doris.batch.size | int | No | 1024
| the batch size of the write to doris each http request, when the row
reaches the size or checkpoint is executed, the data of cached will write to
server.
|
+| needs_unsupported_type_casting | boolean | No | false
| Whether to enable the unsupported type casting, such as Decimal64 to
Double
|
+| schema_save_mode | Enum | no |
CREATE_SCHEMA_WHEN_NOT_EXIST | the schema save mode, please refer to
`schema_save_mode` below
|
+| data_save_mode | Enum | no | APPEND_DATA
| the data save mode, please refer to `data_save_mode` below
|
+| save_mode_create_template | string | no | see below
| see below
|
+| custom_sql | String | no | -
| When data_save_mode selects CUSTOM_PROCESSING, you should fill in the
CUSTOM_SQL parameter. This parameter usually fills in a SQL that can be
executed. SQL will be executed before synchronization tasks.
|
+| doris.config | map | yes | -
| This option is used to support operations such as `insert`, `delete`,
and `update` when automatically generate sql,and supported formats.
|
+
+### schema_save_mode[Enum]
+
+Before the synchronous task is turned on, different treatment schemes are
selected for the existing surface structure of the target side.
+Option introduction:
+`RECREATE_SCHEMA` :Will create when the table does not exist, delete and
rebuild when the table is saved
+`CREATE_SCHEMA_WHEN_NOT_EXIST` :Will Created when the table does not exist,
skipped when the table is saved
+`ERROR_WHEN_SCHEMA_NOT_EXIST` :Error will be reported when the table does not
exist
+
+### data_save_mode[Enum]
+
+Before the synchronous task is turned on, different processing schemes are
selected for data existing data on the target side.
+Option introduction:
+`DROP_DATA`: Preserve database structure and delete data
+`APPEND_DATA`:Preserve database structure, preserve data
+`CUSTOM_PROCESSING`:User defined processing
+`ERROR_WHEN_DATA_EXISTS`:When there is data, an error is reported
+
+### save_mode_create_template
+
+We use templates to automatically create Doris tables,
+which will create corresponding table creation statements based on the type of
upstream data and schema type,
+and the default template can be modified according to the situation.
+
+```sql
+CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}`
+(
+ ${rowtype_fields}
+) ENGINE = OLAP UNIQUE KEY (${rowtype_primary_key})
+ DISTRIBUTED BY HASH (${rowtype_primary_key})
+ PROPERTIES
+(
+ "replication_num" = "1"
+);
+```
+
+If a custom field is filled in the template, such as adding an `id` field
+
+```sql
+CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}`
+(
+ id,
+ ${rowtype_fields}
+) ENGINE = OLAP UNIQUE KEY (${rowtype_primary_key})
+ DISTRIBUTED BY HASH (${rowtype_primary_key})
+ PROPERTIES
+(
+ "replication_num" = "1"
+);
+```
+
+The connector will automatically obtain the corresponding type from the
upstream to complete the filling,
+and remove the id field from `rowtype_fields`. This method can be used to
customize the modification of field types and attributes.
+
+You can use the following placeholders
+
+- database: Used to get the database in the upstream schema
+- table_name: Used to get the table name in the upstream schema
+- rowtype_fields: Used to get all the fields in the upstream schema, we will
automatically map to the field
+ description of Doris
+- rowtype_primary_key: Used to get the primary key in the upstream schema
(maybe a list)
+- rowtype_unique_key: Used to get the unique key in the upstream schema (maybe
a list)
## Data Type Mapping
@@ -125,7 +195,8 @@ sink {
fenodes = "doris_cdc_e2e:8030"
username = root
password = ""
- table.identifier = "test.e2e_table_sink"
+ database = "test"
+ table = "e2e_table_sink"
sink.label-prefix = "test-cdc"
sink.enable-2pc = "true"
sink.enable-delete = "true"
@@ -197,7 +268,8 @@ sink {
fenodes = "doris_cdc_e2e:8030"
username = root
password = ""
- table.identifier = "test.e2e_table_sink"
+ database = "test"
+ table = "e2e_table_sink"
sink.label-prefix = "test-cdc"
sink.enable-2pc = "true"
sink.enable-delete = "true"
@@ -218,7 +290,8 @@ sink {
fenodes = "e2e_dorisdb:8030"
username = root
password = ""
- table.identifier = "test.e2e_table_sink"
+ database = "test"
+ table = "e2e_table_sink"
sink.enable-2pc = "true"
sink.label-prefix = "test_json"
doris.config = {
@@ -238,7 +311,8 @@ sink {
fenodes = "e2e_dorisdb:8030"
username = root
password = ""
- table.identifier = "test.e2e_table_sink"
+ database = "test"
+ table = "e2e_table_sink"
sink.enable-2pc = "true"
sink.label-prefix = "test_csv"
doris.config = {
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
index e9ce7b261c..9566658979 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
@@ -136,6 +136,9 @@ public class DefaultSaveModeHandler implements
SaveModeHandler {
}
protected void createTable() {
+ if (!catalog.databaseExists(tablePath.getDatabaseName())) {
+ catalog.createDatabase(TablePath.of(tablePath.getDatabaseName(),
""), true);
+ }
catalog.createTable(tablePath, catalogTable, true);
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
index bec10b3d75..109d025695 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
@@ -133,4 +133,7 @@ public abstract class Column implements Serializable {
/** Returns a copy of the column. */
public abstract Column copy();
+
+ /** Returns a copy of the column with a replaced name. */
+ public abstract Column rename(String newColumnName);
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/MetadataColumn.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/MetadataColumn.java
index 5325dac791..9294c13506 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/MetadataColumn.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/MetadataColumn.java
@@ -70,4 +70,16 @@ public class MetadataColumn extends Column {
return MetadataColumn.of(
name, dataType, columnLength, metadataKey, nullable,
defaultValue, comment);
}
+
+ @Override
+ public Column rename(String newColumnName) {
+ return MetadataColumn.of(
+ newColumnName,
+ dataType,
+ columnLength,
+ metadataKey,
+ nullable,
+ defaultValue,
+ comment);
+ }
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PhysicalColumn.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PhysicalColumn.java
index 164752d468..926dcc2eba 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PhysicalColumn.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PhysicalColumn.java
@@ -144,4 +144,21 @@ public class PhysicalColumn extends Column {
options,
longColumnLength);
}
+
+ @Override
+ public Column rename(String newColumnName) {
+ return PhysicalColumn.of(
+ newColumnName,
+ dataType,
+ columnLength,
+ nullable,
+ defaultValue,
+ comment,
+ sourceType,
+ isUnsigned,
+ isZeroFill,
+ bitLen,
+ options,
+ longColumnLength);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java
index 071bf1f608..0e5faef550 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java
@@ -334,4 +334,29 @@ public class DorisCatalog implements Catalog {
options.put(DorisOptions.PASSWORD.key(), password);
return options;
}
+
+ public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ try {
+ if (ignoreIfNotExists) {
+ conn.createStatement()
+ .execute(String.format("TRUNCATE TABLE %s",
tablePath.getFullName()));
+ }
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed TRUNCATE TABLE in catalog %s",
tablePath.getFullName()),
+ e);
+ }
+ }
+
+ public boolean isExistsData(TablePath tablePath) {
+ String tableName = tablePath.getFullName();
+ String sql = String.format("select * from %s limit 1;", tableName);
+ try (PreparedStatement ps = conn.prepareStatement(sql)) {
+ ResultSet resultSet = ps.executeQuery();
+ return resultSet.next();
+ } catch (SQLException e) {
+ throw new CatalogException(String.format("Failed executeSql error
%s", sql), e);
+ }
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalogFactory.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalogFactory.java
index 3df6bcca24..16db24f48a 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalogFactory.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalogFactory.java
@@ -21,9 +21,13 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.connectors.doris.config.DorisConfig;
import org.apache.seatunnel.connectors.doris.config.DorisOptions;
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
public class DorisCatalogFactory implements CatalogFactory {
@Override
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisConfig.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisConfig.java
index 98a43c7b8e..ea93b8f551 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisConfig.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisConfig.java
@@ -41,6 +41,7 @@ import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.DORIS_RE
import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.DORIS_SINK_CONFIG_PREFIX;
import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.DORIS_TABLET_SIZE;
import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.FENODES;
+import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.NEEDS_UNSUPPORTED_TYPE_CASTING;
import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.PASSWORD;
import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.QUERY_PORT;
import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.SAVE_MODE_CREATE_TEMPLATE;
@@ -51,7 +52,6 @@ import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.SINK_ENA
import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.SINK_ENABLE_DELETE;
import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.SINK_LABEL_PREFIX;
import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.SINK_MAX_RETRIES;
-import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.TABLE_IDENTIFIER;
import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.USERNAME;
@Setter
@@ -64,7 +64,6 @@ public class DorisConfig implements Serializable {
private String username;
private String password;
private Integer queryPort;
- private String tableIdentifier;
private int batchSize;
// source option
@@ -89,6 +88,7 @@ public class DorisConfig implements Serializable {
private Integer bufferSize;
private Integer bufferCount;
private Properties streamLoadProps;
+ private boolean needsUnsupportedTypeCasting;
// create table option
private String createTableTemplate;
@@ -105,7 +105,6 @@ public class DorisConfig implements Serializable {
dorisConfig.setFrontends(config.get(FENODES));
dorisConfig.setUsername(config.get(USERNAME));
dorisConfig.setPassword(config.get(PASSWORD));
- dorisConfig.setTableIdentifier(config.get(TABLE_IDENTIFIER));
dorisConfig.setQueryPort(config.get(QUERY_PORT));
dorisConfig.setStreamLoadProps(parseStreamLoadProperties(config));
@@ -129,6 +128,7 @@ public class DorisConfig implements Serializable {
dorisConfig.setBufferSize(config.get(SINK_BUFFER_SIZE));
dorisConfig.setBufferCount(config.get(SINK_BUFFER_COUNT));
dorisConfig.setEnableDelete(config.get(SINK_ENABLE_DELETE));
+
dorisConfig.setNeedsUnsupportedTypeCasting(config.get(NEEDS_UNSUPPORTED_TYPE_CASTING));
// create table option
dorisConfig.setCreateTableTemplate(config.get(SAVE_MODE_CREATE_TEMPLATE));
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java
index 04a4f02851..7dd4612c48 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java
@@ -17,17 +17,18 @@
package org.apache.seatunnel.connectors.doris.config;
-import org.apache.seatunnel.shade.com.google.common.collect.ImmutableMap;
-
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
import java.util.Map;
+import static
org.apache.seatunnel.api.sink.SinkCommonOptions.MULTI_TABLE_SINK_REPLICA;
+
public interface DorisOptions {
- int DORIS_TABLET_SIZE_MIN = 1;
int DORIS_TABLET_SIZE_DEFAULT = Integer.MAX_VALUE;
int DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT = 30 * 1000;
int DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT = 30 * 1000;
@@ -36,32 +37,11 @@ public interface DorisOptions {
Boolean DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT = false;
int DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT = 64;
int DORIS_BATCH_SIZE_DEFAULT = 1024;
- long DORIS_EXEC_MEM_LIMIT_DEFAULT = 2147483648L;
int DEFAULT_SINK_CHECK_INTERVAL = 10000;
int DEFAULT_SINK_MAX_RETRIES = 3;
int DEFAULT_SINK_BUFFER_SIZE = 256 * 1024;
int DEFAULT_SINK_BUFFER_COUNT = 3;
- Map<String, String> DEFAULT_CREATE_PROPERTIES =
- ImmutableMap.of(
- "replication_allocation", "tag.location.default: 3",
- "storage_format", "V2",
- "disable_auto_compaction", "false");
-
- String DEFAULT_CREATE_TEMPLATE =
- "CREATE TABLE ${table_identifier}\n"
- + "(\n"
- + "${column_definition}\n"
- + ")\n"
- + "ENGINE = ${engine_type}\n"
- + "UNIQUE KEY (${key_columns})\n"
- + "COMMENT ${table_comment}\n"
- + "${partition_info}\n"
- + "DISTRIBUTED BY HASH (${distribution_columns}) BUCKETS
${distribution_bucket}\n"
- + "PROPERTIES (\n"
- + "${properties}\n"
- + ")\n";
-
// common option
Option<String> FENODES =
Options.key("fenodes")
@@ -75,11 +55,13 @@ public interface DorisOptions {
.defaultValue(9030)
.withDescription("doris query port");
+ @Deprecated
Option<String> TABLE_IDENTIFIER =
Options.key("table.identifier")
.stringType()
.noDefaultValue()
.withDescription("the doris table name.");
+
Option<String> USERNAME =
Options.key("username")
.stringType()
@@ -90,6 +72,17 @@ public interface DorisOptions {
.stringType()
.noDefaultValue()
.withDescription("the doris password.");
+
+ Option<String> TABLE =
+ Options.key("table")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("the doris table name.");
+ Option<String> DATABASE =
+ Options.key("database")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("the doris database name.");
Option<Integer> DORIS_BATCH_SIZE =
Options.key("doris.batch.size")
.intType()
@@ -197,6 +190,28 @@ public interface DorisOptions {
.defaultValue("information_schema")
.withDescription("");
+ Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
+ Options.key("schema_save_mode")
+ .enumType(SchemaSaveMode.class)
+ .defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST)
+ .withDescription("schema_save_mode");
+
+ Option<DataSaveMode> DATA_SAVE_MODE =
+ Options.key("data_save_mode")
+ .enumType(DataSaveMode.class)
+ .defaultValue(DataSaveMode.APPEND_DATA)
+ .withDescription("data_save_mode");
+
+ Option<String> CUSTOM_SQL =
+
Options.key("custom_sql").stringType().noDefaultValue().withDescription("custom_sql");
+
+ Option<Boolean> NEEDS_UNSUPPORTED_TYPE_CASTING =
+ Options.key("needs_unsupported_type_casting")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to enable the unsupported type casting,
such as Decimal64 to Double");
+
// create table
Option<String> SAVE_MODE_CREATE_TEMPLATE =
Options.key("save_mode_create_template")
@@ -205,17 +220,38 @@ public interface DorisOptions {
"CREATE TABLE IF NOT EXISTS
`${database}`.`${table_name}` (\n"
+ "${rowtype_fields}\n"
+ ") ENGINE=OLAP\n"
- + "UNIQUE KEY (${rowtype_primary_key})\n"
- + "DISTRIBUTED BY HASH
(${rowtype_primary_key})\n"
+ + " UNIQUE KEY (${rowtype_primary_key})\n"
+ + "DISTRIBUTED BY HASH
(${rowtype_primary_key})\n "
+ "PROPERTIES (\n"
- + " \"replication_num\" = \"1\" \n"
+ + "\"replication_allocation\" =
\"tag.location.default: 1\",\n"
+ + "\"in_memory\" = \"false\",\n"
+ + "\"storage_format\" = \"V2\",\n"
+ + "\"disable_auto_compaction\" =
\"false\"\n"
+ ")")
.withDescription("Create table statement template, used to
create Doris table");
OptionRule.Builder SINK_RULE =
OptionRule.builder()
- .required(FENODES, USERNAME, PASSWORD, TABLE_IDENTIFIER)
- .optional(DORIS_BATCH_SIZE);
+ .required(
+ FENODES,
+ USERNAME,
+ PASSWORD,
+ SINK_LABEL_PREFIX,
+ DORIS_SINK_CONFIG_PREFIX,
+ DATA_SAVE_MODE,
+ SCHEMA_SAVE_MODE)
+ .optional(
+ DATABASE,
+ TABLE,
+ TABLE_IDENTIFIER,
+ QUERY_PORT,
+ DORIS_BATCH_SIZE,
+ SINK_ENABLE_2PC,
+ SINK_ENABLE_DELETE,
+ MULTI_TABLE_SINK_REPLICA,
+ SAVE_MODE_CREATE_TEMPLATE,
+ NEEDS_UNSUPPORTED_TYPE_CASTING)
+ .conditional(DATA_SAVE_MODE,
DataSaveMode.CUSTOM_PROCESSING, CUSTOM_SQL);
OptionRule.Builder CATALOG_RULE =
OptionRule.builder().required(FENODES, QUERY_PORT, USERNAME,
PASSWORD);
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/PartitionDefinition.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/PartitionDefinition.java
deleted file mode 100644
index 884cd7bde4..0000000000
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/PartitionDefinition.java
+++ /dev/null
@@ -1,147 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.seatunnel.connectors.doris.rest;
-
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Objects;
-import java.util.Set;
-
-/** Doris partition info. */
-public class PartitionDefinition implements Serializable,
Comparable<PartitionDefinition> {
- private final String database;
- private final String table;
-
- private final String beAddress;
- private final Set<Long> tabletIds;
- private final String queryPlan;
-
- public PartitionDefinition(
- String database, String table, String beAddress, Set<Long>
tabletIds, String queryPlan)
- throws IllegalArgumentException {
- this.database = database;
- this.table = table;
- this.beAddress = beAddress;
- this.tabletIds = tabletIds;
- this.queryPlan = queryPlan;
- }
-
- public String getBeAddress() {
- return beAddress;
- }
-
- public Set<Long> getTabletIds() {
- return tabletIds;
- }
-
- public String getDatabase() {
- return database;
- }
-
- public String getTable() {
- return table;
- }
-
- public String getQueryPlan() {
- return queryPlan;
- }
-
- @Override
- public int compareTo(PartitionDefinition o) {
- int cmp = database.compareTo(o.database);
- if (cmp != 0) {
- return cmp;
- }
- cmp = table.compareTo(o.table);
- if (cmp != 0) {
- return cmp;
- }
- cmp = beAddress.compareTo(o.beAddress);
- if (cmp != 0) {
- return cmp;
- }
- cmp = queryPlan.compareTo(o.queryPlan);
- if (cmp != 0) {
- return cmp;
- }
-
- cmp = tabletIds.size() - o.tabletIds.size();
- if (cmp != 0) {
- return cmp;
- }
-
- Set<Long> similar = new HashSet<>(tabletIds);
- Set<Long> diffSelf = new HashSet<>(tabletIds);
- Set<Long> diffOther = new HashSet<>(o.tabletIds);
- similar.retainAll(o.tabletIds);
- diffSelf.removeAll(similar);
- diffOther.removeAll(similar);
- if (diffSelf.size() == 0) {
- return 0;
- }
- long diff = Collections.min(diffSelf) - Collections.min(diffOther);
- return diff < 0 ? -1 : 1;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- PartitionDefinition that = (PartitionDefinition) o;
- return Objects.equals(database, that.database)
- && Objects.equals(table, that.table)
- && Objects.equals(beAddress, that.beAddress)
- && Objects.equals(tabletIds, that.tabletIds)
- && Objects.equals(queryPlan, that.queryPlan);
- }
-
- @Override
- public int hashCode() {
- int result = database.hashCode();
- result = 31 * result + table.hashCode();
- result = 31 * result + beAddress.hashCode();
- result = 31 * result + queryPlan.hashCode();
- result = 31 * result + tabletIds.hashCode();
- return result;
- }
-
- @Override
- public String toString() {
- return "PartitionDefinition{"
- + ", database='"
- + database
- + '\''
- + ", table='"
- + table
- + '\''
- + ", beAddress='"
- + beAddress
- + '\''
- + ", tabletIds="
- + tabletIds
- + ", queryPlan='"
- + queryPlan
- + '\''
- + '}';
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java
index e1c83a0700..e8b66d4497 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java
@@ -21,12 +21,7 @@ import
org.apache.seatunnel.connectors.doris.config.DorisConfig;
import org.apache.seatunnel.connectors.doris.config.DorisOptions;
import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode;
import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
-import org.apache.seatunnel.connectors.doris.rest.models.Backend;
-import org.apache.seatunnel.connectors.doris.rest.models.BackendRow;
import org.apache.seatunnel.connectors.doris.rest.models.BackendV2;
-import org.apache.seatunnel.connectors.doris.rest.models.QueryPlan;
-import org.apache.seatunnel.connectors.doris.rest.models.Schema;
-import org.apache.seatunnel.connectors.doris.rest.models.Tablet;
import org.apache.seatunnel.connectors.doris.util.ErrorMessages;
import org.apache.commons.io.IOUtils;
@@ -36,7 +31,6 @@ import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpRequestBase;
-import org.apache.http.entity.StringEntity;
import org.slf4j.Logger;
@@ -55,30 +49,16 @@ import java.io.Serializable;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.stream.Collectors;
@Slf4j
public class RestService implements Serializable {
- public static final int REST_RESPONSE_STATUS_OK = 200;
- public static final int REST_RESPONSE_CODE_OK = 0;
- private static final String REST_RESPONSE_BE_ROWS_KEY = "rows";
- private static final String API_PREFIX = "/api";
- private static final String SCHEMA = "_schema";
- private static final String QUERY_PLAN = "_query_plan";
- private static final String UNIQUE_KEYS_TYPE = "UNIQUE_KEYS";
- @Deprecated private static final String BACKENDS =
"/rest/v1/system?path=//backends";
private static final String BACKENDS_V2 = "/api/backends?is_alive=true";
- private static final String FE_LOGIN = "/rest/v1/login";
- private static final String BASE_URL = "http://%s%s";
private static String send(DorisConfig dorisConfig, HttpRequestBase
request, Logger logger)
throws DorisConnectorException {
@@ -133,7 +113,7 @@ public class RestService implements Serializable {
dorisConfig.getPassword(),
logger);
}
- if (response == null) {
+ if (StringUtils.isEmpty(response)) {
logger.warn(
"Failed to get response from Doris FE {}, http
code is {}",
request.getURI(),
@@ -233,43 +213,6 @@ public class RestService implements Serializable {
return result.toString();
}
- @VisibleForTesting
- static String[] parseIdentifier(String tableIdentifier, Logger logger)
- throws DorisConnectorException {
- logger.trace("Parse identifier '{}'.", tableIdentifier);
- if (StringUtils.isEmpty(tableIdentifier)) {
- String errMsg =
- String.format(
- ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE,
- "table.identifier",
- tableIdentifier);
- throw new
DorisConnectorException(DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg);
- }
- String[] identifier = tableIdentifier.split("\\.");
- if (identifier.length != 2) {
- String errMsg =
- String.format(
- ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE,
- "table.identifier",
- tableIdentifier);
- throw new
DorisConnectorException(DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg);
- }
- return identifier;
- }
-
- @VisibleForTesting
- static String randomEndpoint(String feNodes, Logger logger) throws
DorisConnectorException {
- logger.trace("Parse fenodes '{}'.", feNodes);
- if (StringUtils.isEmpty(feNodes)) {
- String errMsg =
- String.format(ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE,
"fenodes", feNodes);
- throw new
DorisConnectorException(DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg);
- }
- List<String> nodes = Arrays.asList(feNodes.split(","));
- Collections.shuffle(nodes);
- return nodes.get(0).trim();
- }
-
@VisibleForTesting
static List<String> allEndpoints(String feNodes, Logger logger) throws
DorisConnectorException {
logger.trace("Parse fenodes '{}'.", feNodes);
@@ -286,7 +229,7 @@ public class RestService implements Serializable {
@VisibleForTesting
public static String randomBackend(DorisConfig dorisConfig, Logger logger)
- throws DorisConnectorException, IOException {
+ throws DorisConnectorException {
List<BackendV2.BackendRowV2> backends = getBackendsV2(dorisConfig,
logger);
logger.trace("Parse beNodes '{}'.", backends);
if (backends == null || backends.isEmpty()) {
@@ -311,59 +254,9 @@ public class RestService implements Serializable {
}
}
- @Deprecated
- @VisibleForTesting
- static List<BackendRow> getBackends(DorisConfig dorisConfig, Logger logger)
- throws DorisConnectorException, IOException {
- String feNodes = dorisConfig.getFrontends();
- String feNode = randomEndpoint(feNodes, logger);
- String beUrl = String.format(BASE_URL, feNode, BACKENDS);
- HttpGet httpGet = new HttpGet(beUrl);
- String response = send(dorisConfig, httpGet, logger);
- logger.info("Backend Info:{}", response);
- List<BackendRow> backends = parseBackend(response, logger);
- return backends;
- }
-
- @Deprecated
- static List<BackendRow> parseBackend(String response, Logger logger)
- throws DorisConnectorException, IOException {
- ObjectMapper mapper = new ObjectMapper();
- Backend backend;
- try {
- backend = mapper.readValue(response, Backend.class);
- } catch (JsonParseException e) {
- String errMsg = "Doris BE's response is not a json. res: " +
response;
- logger.error(errMsg, e);
- throw new DorisConnectorException(
- DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
- } catch (JsonMappingException e) {
- String errMsg = "Doris BE's response cannot map to schema. res: "
+ response;
- logger.error(errMsg, e);
- throw new DorisConnectorException(
- DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
- } catch (IOException e) {
- String errMsg = "Parse Doris BE's response to json failed. res: "
+ response;
- logger.error(errMsg, e);
- throw new DorisConnectorException(
- DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
- }
-
- if (backend == null) {
- logger.error(ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE);
- throw new DorisConnectorException(
- DorisConnectorErrorCode.REST_SERVICE_FAILED,
- ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE);
- }
- List<BackendRow> backendRows =
- backend.getRows().stream().filter(v ->
v.getAlive()).collect(Collectors.toList());
- logger.debug("Parsing schema result is '{}'.", backendRows);
- return backendRows;
- }
-
@VisibleForTesting
public static List<BackendV2.BackendRowV2> getBackendsV2(DorisConfig
dorisConfig, Logger logger)
- throws DorisConnectorException, IOException {
+ throws DorisConnectorException {
String feNodes = dorisConfig.getFrontends();
List<String> feNodeList = allEndpoints(feNodes, logger);
for (String feNode : feNodeList) {
@@ -385,7 +278,7 @@ public class RestService implements Serializable {
}
static List<BackendV2.BackendRowV2> parseBackendV2(String response, Logger
logger)
- throws DorisConnectorException, IOException {
+ throws DorisConnectorException {
ObjectMapper mapper = new ObjectMapper();
BackendV2 backend;
try {
@@ -416,262 +309,4 @@ public class RestService implements Serializable {
logger.debug("Parsing schema result is '{}'.", backendRows);
return backendRows;
}
-
- @VisibleForTesting
- static String getUriStr(DorisConfig dorisConfig, Logger logger) throws
DorisConnectorException {
- String[] identifier =
parseIdentifier(dorisConfig.getTableIdentifier(), logger);
- return "http://"
- + randomEndpoint(dorisConfig.getFrontends(), logger)
- + API_PREFIX
- + "/"
- + identifier[0]
- + "/"
- + identifier[1]
- + "/";
- }
-
- public static Schema getSchema(DorisConfig dorisConfig, Logger logger)
- throws DorisConnectorException {
- logger.trace("Finding schema.");
- HttpGet httpGet = new HttpGet(getUriStr(dorisConfig, logger) + SCHEMA);
- String response = send(dorisConfig, httpGet, logger);
- logger.debug("Find schema response is '{}'.", response);
- return parseSchema(response, logger);
- }
-
- public static boolean isUniqueKeyType(DorisConfig dorisConfig, Logger
logger)
- throws DorisConnectorException {
- try {
- return UNIQUE_KEYS_TYPE.equals(getSchema(dorisConfig,
logger).getKeysType());
- } catch (Exception e) {
- throw new
DorisConnectorException(DorisConnectorErrorCode.REST_SERVICE_FAILED, e);
- }
- }
-
- @VisibleForTesting
- public static Schema parseSchema(String response, Logger logger)
- throws DorisConnectorException {
- logger.trace("Parse response '{}' to schema.", response);
- ObjectMapper mapper = new ObjectMapper();
- Schema schema;
- try {
- schema = mapper.readValue(response, Schema.class);
- } catch (JsonParseException e) {
- String errMsg = "Doris FE's response is not a json. res: " +
response;
- logger.error(errMsg, e);
- throw new DorisConnectorException(
- DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
- } catch (JsonMappingException e) {
- String errMsg = "Doris FE's response cannot map to schema. res: "
+ response;
- logger.error(errMsg, e);
- throw new DorisConnectorException(
- DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
- } catch (IOException e) {
- String errMsg = "Parse Doris FE's response to json failed. res: "
+ response;
- logger.error(errMsg, e);
- throw new DorisConnectorException(
- DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
- }
-
- if (schema == null) {
- throw new DorisConnectorException(
- DorisConnectorErrorCode.REST_SERVICE_FAILED,
- ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE);
- }
-
- if (schema.getStatus() != REST_RESPONSE_STATUS_OK) {
- String errMsg = "Doris FE's response is not OK, status is " +
schema.getStatus();
- logger.error(errMsg);
- throw new
DorisConnectorException(DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg);
- }
- logger.debug("Parsing schema result is '{}'.", schema);
- return schema;
- }
-
- public static List<PartitionDefinition> findPartitions(DorisConfig
dorisConfig, Logger logger)
- throws DorisConnectorException {
- String[] tableIdentifiers =
parseIdentifier(dorisConfig.getTableIdentifier(), logger);
- String readFields =
- StringUtils.isBlank(dorisConfig.getReadField()) ? "*" :
dorisConfig.getReadField();
- String sql =
- "select "
- + readFields
- + " from `"
- + tableIdentifiers[0]
- + "`.`"
- + tableIdentifiers[1]
- + "`";
- if (!StringUtils.isEmpty(dorisConfig.getFilterQuery())) {
- sql += " where " + dorisConfig.getFilterQuery();
- }
- logger.debug("Query SQL Sending to Doris FE is: '{}'.", sql);
-
- HttpPost httpPost = new HttpPost(getUriStr(dorisConfig, logger) +
QUERY_PLAN);
- String entity = "{\"sql\": \"" + sql + "\"}";
- logger.debug("Post body Sending to Doris FE is: '{}'.", entity);
- StringEntity stringEntity = new StringEntity(entity,
StandardCharsets.UTF_8);
- stringEntity.setContentEncoding("UTF-8");
- stringEntity.setContentType("application/json");
- httpPost.setEntity(stringEntity);
-
- String resStr = send(dorisConfig, httpPost, logger);
- logger.debug("Find partition response is '{}'.", resStr);
- QueryPlan queryPlan = getQueryPlan(resStr, logger);
- Map<String, List<Long>> be2Tablets = selectBeForTablet(queryPlan,
logger);
- return tabletsMapToPartition(
- dorisConfig,
- be2Tablets,
- queryPlan.getOpaquedQueryPlan(),
- tableIdentifiers[0],
- tableIdentifiers[1],
- logger);
- }
-
- @VisibleForTesting
- static QueryPlan getQueryPlan(String response, Logger logger) throws
DorisConnectorException {
- ObjectMapper mapper = new ObjectMapper();
- QueryPlan queryPlan;
- try {
- queryPlan = mapper.readValue(response, QueryPlan.class);
- } catch (JsonParseException e) {
- String errMsg = "Doris FE's response is not a json. res: " +
response;
- logger.error(errMsg, e);
- throw new DorisConnectorException(
- DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
- } catch (JsonMappingException e) {
- String errMsg = "Doris FE's response cannot map to schema. res: "
+ response;
- logger.error(errMsg, e);
- throw new DorisConnectorException(
- DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
- } catch (IOException e) {
- String errMsg = "Parse Doris FE's response to json failed. res: "
+ response;
- logger.error(errMsg, e);
- throw new DorisConnectorException(
- DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
- }
-
- if (queryPlan == null) {
- throw new DorisConnectorException(
- DorisConnectorErrorCode.REST_SERVICE_FAILED,
- ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE);
- }
-
- if (queryPlan.getStatus() != REST_RESPONSE_STATUS_OK) {
- String errMsg = "Doris FE's response is not OK, status is " +
queryPlan.getStatus();
- logger.error(errMsg);
- throw new
DorisConnectorException(DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg);
- }
- logger.debug("Parsing partition result is '{}'.", queryPlan);
- return queryPlan;
- }
-
- @VisibleForTesting
- static Map<String, List<Long>> selectBeForTablet(QueryPlan queryPlan,
Logger logger)
- throws DorisConnectorException {
- Map<String, List<Long>> be2Tablets = new HashMap<>();
- for (Map.Entry<String, Tablet> part :
queryPlan.getPartitions().entrySet()) {
- logger.debug("Parse tablet info: '{}'.", part);
- long tabletId;
- try {
- tabletId = Long.parseLong(part.getKey());
- } catch (NumberFormatException e) {
- String errMsg = "Parse tablet id '" + part.getKey() + "' to
long failed.";
- logger.error(errMsg, e);
- throw new DorisConnectorException(
- DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg,
e);
- }
- String target = null;
- int tabletCount = Integer.MAX_VALUE;
- for (String candidate : part.getValue().getRoutings()) {
- logger.trace("Evaluate Doris BE '{}' to tablet '{}'.",
candidate, tabletId);
- if (!be2Tablets.containsKey(candidate)) {
- logger.debug(
- "Choice a new Doris BE '{}' for tablet '{}'.",
candidate, tabletId);
- List<Long> tablets = new ArrayList<>();
- be2Tablets.put(candidate, tablets);
- target = candidate;
- break;
- } else {
- if (be2Tablets.get(candidate).size() < tabletCount) {
- target = candidate;
- tabletCount = be2Tablets.get(candidate).size();
- logger.debug(
- "Current candidate Doris BE to tablet '{}' is
'{}' with tablet count {}.",
- tabletId,
- target,
- tabletCount);
- }
- }
- }
- if (target == null) {
- String errMsg = "Cannot choice Doris BE for tablet " +
tabletId;
- logger.error(errMsg);
- throw new DorisConnectorException(
- DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg);
- }
-
- logger.debug("Choice Doris BE '{}' for tablet '{}'.", target,
tabletId);
- be2Tablets.get(target).add(tabletId);
- }
- return be2Tablets;
- }
-
- @VisibleForTesting
- static int tabletCountLimitForOnePartition(DorisConfig dorisConfig, Logger
logger) {
- int tabletsSize = DorisOptions.DORIS_TABLET_SIZE_DEFAULT;
- if (dorisConfig.getTabletSize() != null) {
- tabletsSize = dorisConfig.getTabletSize();
- }
- if (tabletsSize < DorisOptions.DORIS_TABLET_SIZE_MIN) {
- logger.warn(
- "{} is less than {}, set to default value {}.",
- DorisOptions.DORIS_TABLET_SIZE,
- DorisOptions.DORIS_TABLET_SIZE_MIN,
- DorisOptions.DORIS_TABLET_SIZE_MIN);
- tabletsSize = DorisOptions.DORIS_TABLET_SIZE_MIN;
- }
- logger.debug("Tablet size is set to {}.", tabletsSize);
- return tabletsSize;
- }
-
- @VisibleForTesting
- static List<PartitionDefinition> tabletsMapToPartition(
- DorisConfig dorisConfig,
- Map<String, List<Long>> be2Tablets,
- String opaquedQueryPlan,
- String database,
- String table,
- Logger logger)
- throws DorisConnectorException {
- int tabletsSize = tabletCountLimitForOnePartition(dorisConfig, logger);
- List<PartitionDefinition> partitions = new ArrayList<>();
- for (Map.Entry<String, List<Long>> beInfo : be2Tablets.entrySet()) {
- logger.debug("Generate partition with beInfo: '{}'.", beInfo);
- HashSet<Long> tabletSet = new HashSet<>(beInfo.getValue());
- beInfo.getValue().clear();
- beInfo.getValue().addAll(tabletSet);
- int first = 0;
- while (first < beInfo.getValue().size()) {
- Set<Long> partitionTablets =
- new HashSet<>(
- beInfo.getValue()
- .subList(
- first,
- Math.min(
-
beInfo.getValue().size(),
- first + tabletsSize)));
- first = first + tabletsSize;
- PartitionDefinition partitionDefinition =
- new PartitionDefinition(
- database,
- table,
- beInfo.getKey(),
- partitionTablets,
- opaquedQueryPlan);
- logger.debug("Generate one PartitionDefinition '{}'.",
partitionDefinition);
- partitions.add(partitionDefinition);
- }
- }
- return partitions;
- }
}
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Backend.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Backend.java
deleted file mode 100644
index f151a0e727..0000000000
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Backend.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.doris.rest.models;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.util.List;
-
-/** Be response model */
-@Deprecated
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class Backend {
-
- @JsonProperty(value = "rows")
- private List<BackendRow> rows;
-
- public List<BackendRow> getRows() {
- return rows;
- }
-
- public void setRows(List<BackendRow> rows) {
- this.rows = rows;
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/BackendRow.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/BackendRow.java
deleted file mode 100644
index fe2260bea7..0000000000
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/BackendRow.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.doris.rest.models;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-
-@Getter
-@Setter
-@ToString
-@Deprecated
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class BackendRow {
-
- @JsonProperty(value = "HttpPort")
- private String httpPort;
-
- @JsonProperty(value = "IP")
- private String ip;
-
- @JsonProperty(value = "Alive")
- private Boolean alive;
-}
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Field.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Field.java
deleted file mode 100644
index 8c9d00d01a..0000000000
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Field.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.doris.rest.models;
-
-import java.util.Objects;
-
-public class Field {
- private String name;
- private String type;
- private String comment;
- private int precision;
- private int scale;
- private String aggregationType;
-
- public Field() {}
-
- public Field(
- String name,
- String type,
- String comment,
- int precision,
- int scale,
- String aggregationType) {
- this.name = name;
- this.type = type;
- this.comment = comment;
- this.precision = precision;
- this.scale = scale;
- this.aggregationType = aggregationType;
- }
-
- public String getAggregationType() {
- return aggregationType;
- }
-
- public void setAggregationType(String aggregationType) {
- this.aggregationType = aggregationType;
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public String getType() {
- return type;
- }
-
- public void setType(String type) {
- this.type = type;
- }
-
- public String getComment() {
- return comment;
- }
-
- public void setComment(String comment) {
- this.comment = comment;
- }
-
- public int getPrecision() {
- return precision;
- }
-
- public void setPrecision(int precision) {
- this.precision = precision;
- }
-
- public int getScale() {
- return scale;
- }
-
- public void setScale(int scale) {
- this.scale = scale;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- Field field = (Field) o;
- return precision == field.precision
- && scale == field.scale
- && Objects.equals(name, field.name)
- && Objects.equals(type, field.type)
- && Objects.equals(comment, field.comment);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(name, type, comment, precision, scale);
- }
-
- @Override
- public String toString() {
- return "Field{"
- + "name='"
- + name
- + '\''
- + ", type='"
- + type
- + '\''
- + ", comment='"
- + comment
- + '\''
- + ", precision="
- + precision
- + ", scale="
- + scale
- + '}';
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/QueryPlan.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/QueryPlan.java
deleted file mode 100644
index d59c6124cd..0000000000
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/QueryPlan.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.doris.rest.models;
-
-import java.util.Map;
-import java.util.Objects;
-
-public class QueryPlan {
- private int status;
- private String opaquedQueryPlan;
- private Map<String, Tablet> partitions;
-
- public int getStatus() {
- return status;
- }
-
- public void setStatus(int status) {
- this.status = status;
- }
-
- public String getOpaquedQueryPlan() {
- return opaquedQueryPlan;
- }
-
- public void setOpaquedQueryPlan(String opaquedQueryPlan) {
- this.opaquedQueryPlan = opaquedQueryPlan;
- }
-
- public Map<String, Tablet> getPartitions() {
- return partitions;
- }
-
- public void setPartitions(Map<String, Tablet> partitions) {
- this.partitions = partitions;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- QueryPlan queryPlan = (QueryPlan) o;
- return status == queryPlan.status
- && Objects.equals(opaquedQueryPlan, queryPlan.opaquedQueryPlan)
- && Objects.equals(partitions, queryPlan.partitions);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(status, opaquedQueryPlan, partitions);
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Schema.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Schema.java
deleted file mode 100644
index 60e06bba97..0000000000
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Schema.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.doris.rest.models;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-
-public class Schema {
- private int status = 0;
- private String keysType;
- private List<Field> properties;
-
- public Schema() {
- properties = new ArrayList<>();
- }
-
- public Schema(int fieldCount) {
- properties = new ArrayList<>(fieldCount);
- }
-
- public int getStatus() {
- return status;
- }
-
- public void setStatus(int status) {
- this.status = status;
- }
-
- public String getKeysType() {
- return keysType;
- }
-
- public void setKeysType(String keysType) {
- this.keysType = keysType;
- }
-
- public List<Field> getProperties() {
- return properties;
- }
-
- public void setProperties(List<Field> properties) {
- this.properties = properties;
- }
-
- public void put(
- String name,
- String type,
- String comment,
- int scale,
- int precision,
- String aggregationType) {
- properties.add(new Field(name, type, comment, scale, precision,
aggregationType));
- }
-
- public void put(Field f) {
- properties.add(f);
- }
-
- public Field get(int index) {
- if (index >= properties.size()) {
- throw new IndexOutOfBoundsException(
- "Index: " + index + ", Fields size:" + properties.size());
- }
- return properties.get(index);
- }
-
- public int size() {
- return properties.size();
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- Schema schema = (Schema) o;
- return status == schema.status && Objects.equals(properties,
schema.properties);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(status, properties);
- }
-
- @Override
- public String toString() {
- return "Schema{" + "status=" + status + ", properties=" + properties +
'}';
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Tablet.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Tablet.java
deleted file mode 100644
index cb52490997..0000000000
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Tablet.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.doris.rest.models;
-
-import java.util.List;
-import java.util.Objects;
-
-public class Tablet {
- private List<String> routings;
- private int version;
- private long versionHash;
- private long schemaHash;
-
- public List<String> getRoutings() {
- return routings;
- }
-
- public void setRoutings(List<String> routings) {
- this.routings = routings;
- }
-
- public int getVersion() {
- return version;
- }
-
- public void setVersion(int version) {
- this.version = version;
- }
-
- public long getVersionHash() {
- return versionHash;
- }
-
- public void setVersionHash(long versionHash) {
- this.versionHash = versionHash;
- }
-
- public long getSchemaHash() {
- return schemaHash;
- }
-
- public void setSchemaHash(long schemaHash) {
- this.schemaHash = schemaHash;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- Tablet tablet = (Tablet) o;
- return version == tablet.version
- && versionHash == tablet.versionHash
- && schemaHash == tablet.schemaHash
- && Objects.equals(routings, tablet.routings);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(routings, version, versionHash, schemaHash);
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
index 8a45e34cef..e0ef8e1893 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
@@ -18,17 +18,24 @@
package org.apache.seatunnel.connectors.doris.sink;
import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.sink.SupportSaveMode;
+import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.doris.config.DorisConfig;
+import org.apache.seatunnel.connectors.doris.config.DorisOptions;
+import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
import org.apache.seatunnel.connectors.doris.sink.committer.DorisCommitInfo;
import
org.apache.seatunnel.connectors.doris.sink.committer.DorisCommitInfoSerializer;
import org.apache.seatunnel.connectors.doris.sink.committer.DorisCommitter;
@@ -41,17 +48,22 @@ import java.util.Collections;
import java.util.List;
import java.util.Optional;
+import static
org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory;
+
public class DorisSink
implements SeaTunnelSink<SeaTunnelRow, DorisSinkState,
DorisCommitInfo, DorisCommitInfo>,
- SupportSaveMode {
+ SupportSaveMode,
+ SupportMultiTableSink {
private final DorisConfig dorisConfig;
- private final SeaTunnelRowType seaTunnelRowType;
+ private final ReadonlyConfig config;
+ private final CatalogTable catalogTable;
private String jobId;
public DorisSink(ReadonlyConfig config, CatalogTable catalogTable) {
+ this.config = config;
+ this.catalogTable = catalogTable;
this.dorisConfig = DorisConfig.of(config);
- this.seaTunnelRowType =
catalogTable.getTableSchema().toPhysicalRowDataType();
}
@Override
@@ -68,13 +80,13 @@ public class DorisSink
public SinkWriter<SeaTunnelRow, DorisCommitInfo, DorisSinkState>
createWriter(
SinkWriter.Context context) throws IOException {
return new DorisSinkWriter(
- context, Collections.emptyList(), seaTunnelRowType,
dorisConfig, jobId);
+ context, Collections.emptyList(), catalogTable, dorisConfig,
jobId);
}
@Override
public SinkWriter<SeaTunnelRow, DorisCommitInfo, DorisSinkState>
restoreWriter(
SinkWriter.Context context, List<DorisSinkState> states) throws
IOException {
- return new DorisSinkWriter(context, states, seaTunnelRowType,
dorisConfig, jobId);
+ return new DorisSinkWriter(context, states, catalogTable, dorisConfig,
jobId);
}
@Override
@@ -94,6 +106,27 @@ public class DorisSink
@Override
public Optional<SaveModeHandler> getSaveModeHandler() {
- return Optional.empty();
+ CatalogFactory catalogFactory =
+ discoverFactory(
+ Thread.currentThread().getContextClassLoader(),
+ CatalogFactory.class,
+ "Doris");
+ if (catalogFactory == null) {
+ throw new DorisConnectorException(
+ SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format(
+ "PluginName: %s, PluginType: %s, Message: %s",
+ getPluginName(), PluginType.SINK, "Cannot find
Doris catalog factory"));
+ }
+
+ Catalog catalog =
catalogFactory.createCatalog(catalogFactory.factoryIdentifier(), config);
+ catalog.open();
+ return Optional.of(
+ new DefaultSaveModeHandler(
+ config.get(DorisOptions.SCHEMA_SAVE_MODE),
+ config.get(DorisOptions.DATA_SAVE_MODE),
+ catalog,
+ catalogTable,
+ config.get(DorisOptions.CUSTOM_SQL)));
}
}
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java
index 6c001c6244..35b0b8da5e 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.connectors.doris.sink;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
@@ -28,9 +29,20 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.doris.config.DorisOptions;
import org.apache.seatunnel.connectors.doris.sink.committer.DorisCommitInfo;
import org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkState;
+import
org.apache.seatunnel.connectors.doris.util.UnsupportedTypeConverterUtils;
+
+import org.apache.commons.lang3.StringUtils;
import com.google.auto.service.AutoService;
+import static
org.apache.seatunnel.api.sink.SinkReplaceNameConstant.REPLACE_DATABASE_NAME_KEY;
+import static
org.apache.seatunnel.api.sink.SinkReplaceNameConstant.REPLACE_SCHEMA_NAME_KEY;
+import static
org.apache.seatunnel.api.sink.SinkReplaceNameConstant.REPLACE_TABLE_NAME_KEY;
+import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.DATABASE;
+import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.NEEDS_UNSUPPORTED_TYPE_CASTING;
+import static org.apache.seatunnel.connectors.doris.config.DorisOptions.TABLE;
+import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.TABLE_IDENTIFIER;
+
@AutoService(Factory.class)
public class DorisSinkFactory
implements TableSinkFactory<
@@ -49,7 +61,50 @@ public class DorisSinkFactory
public TableSink<SeaTunnelRow, DorisSinkState, DorisCommitInfo,
DorisCommitInfo> createSink(
TableSinkFactoryContext context) {
ReadonlyConfig config = context.getOptions();
- CatalogTable catalogTable = context.getCatalogTable();
- return () -> new DorisSink(config, catalogTable);
+ CatalogTable catalogTable =
+ config.get(NEEDS_UNSUPPORTED_TYPE_CASTING)
+ ? UnsupportedTypeConverterUtils.convertCatalogTable(
+ context.getCatalogTable())
+ : context.getCatalogTable();
+ final CatalogTable finalCatalogTable = this.renameCatalogTable(config,
catalogTable);
+ return () -> new DorisSink(config, finalCatalogTable);
+ }
+
+ private CatalogTable renameCatalogTable(ReadonlyConfig options,
CatalogTable catalogTable) {
+ TableIdentifier tableId = catalogTable.getTableId();
+ String tableName;
+ String databaseName;
+ String tableIdentifier = options.get(TABLE_IDENTIFIER);
+ if (StringUtils.isNotEmpty(tableIdentifier)) {
+ tableName = tableIdentifier.split("\\.")[1];
+ databaseName = tableIdentifier.split("\\.")[0];
+ } else {
+ if (StringUtils.isNotEmpty(options.get(TABLE))) {
+ tableName = replaceName(options.get(TABLE), tableId);
+ } else {
+ tableName = tableId.getTableName();
+ }
+ if (StringUtils.isNotEmpty(options.get(DATABASE))) {
+ databaseName = replaceName(options.get(DATABASE), tableId);
+ } else {
+ databaseName = tableId.getDatabaseName();
+ }
+ }
+ TableIdentifier newTableId =
+ TableIdentifier.of(tableId.getCatalogName(), databaseName,
null, tableName);
+ return CatalogTable.of(newTableId, catalogTable);
+ }
+
+ private String replaceName(String original, TableIdentifier tableId) {
+ if (tableId.getTableName() != null) {
+ original = original.replace(REPLACE_TABLE_NAME_KEY,
tableId.getTableName());
+ }
+ if (tableId.getSchemaName() != null) {
+ original = original.replace(REPLACE_SCHEMA_NAME_KEY,
tableId.getSchemaName());
+ }
+ if (tableId.getDatabaseName() != null) {
+ original = original.replace(REPLACE_DATABASE_NAME_KEY,
tableId.getDatabaseName());
+ }
+ return original;
}
}
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkState.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkState.java
index 03179f92e5..c3c5725b32 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkState.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkState.java
@@ -22,11 +22,13 @@ import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
+import java.io.Serializable;
+
@Setter
@Getter
@ToString
@EqualsAndHashCode
-public class DorisSinkState {
+public class DorisSinkState implements Serializable {
private final String labelPrefix;
private final long checkpointId;
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
index 2f35357e5b..0abdc6269c 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
@@ -18,6 +18,8 @@
package org.apache.seatunnel.connectors.doris.sink.writer;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.doris.config.DorisConfig;
@@ -31,6 +33,7 @@ import
org.apache.seatunnel.connectors.doris.serialize.SeaTunnelRowSerializer;
import org.apache.seatunnel.connectors.doris.sink.LoadStatus;
import org.apache.seatunnel.connectors.doris.sink.committer.DorisCommitInfo;
import org.apache.seatunnel.connectors.doris.util.HttpUtil;
+import
org.apache.seatunnel.connectors.doris.util.UnsupportedTypeConverterUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -53,7 +56,9 @@ import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkState;
@Slf4j
-public class DorisSinkWriter implements SinkWriter<SeaTunnelRow,
DorisCommitInfo, DorisSinkState> {
+public class DorisSinkWriter
+ implements SinkWriter<SeaTunnelRow, DorisCommitInfo, DorisSinkState>,
+ SupportMultiTableSinkWriter<Void> {
private static final int INITIAL_DELAY = 200;
private static final int CONNECT_TIMEOUT = 1000;
private static final List<String> DORIS_SUCCESS_STATUS =
@@ -66,6 +71,7 @@ public class DorisSinkWriter implements
SinkWriter<SeaTunnelRow, DorisCommitInfo
private final LabelGenerator labelGenerator;
private final int intervalTime;
private final DorisSerializer serializer;
+ private final CatalogTable catalogTable;
private final transient ScheduledExecutorService scheduledExecutorService;
private transient Thread executorThread;
private transient volatile Exception loadException = null;
@@ -75,21 +81,28 @@ public class DorisSinkWriter implements
SinkWriter<SeaTunnelRow, DorisCommitInfo
public DorisSinkWriter(
SinkWriter.Context context,
List<DorisSinkState> state,
- SeaTunnelRowType seaTunnelRowType,
+ CatalogTable catalogTable,
DorisConfig dorisConfig,
String jobId)
throws IOException {
this.dorisConfig = dorisConfig;
+ this.catalogTable = catalogTable;
this.lastCheckpointId = !state.isEmpty() ?
state.get(0).getCheckpointId() : 0;
log.info("restore checkpointId {}", lastCheckpointId);
log.info("labelPrefix " + dorisConfig.getLabelPrefix());
this.labelPrefix =
- dorisConfig.getLabelPrefix() + "_" + jobId + "_" +
context.getIndexOfSubtask();
+ dorisConfig.getLabelPrefix()
+ + "_"
+ +
catalogTable.getTablePath().getFullName().replaceAll("\\.", "_")
+ + "_"
+ + jobId
+ + "_"
+ + context.getIndexOfSubtask();
this.labelGenerator = new LabelGenerator(labelPrefix,
dorisConfig.getEnable2PC());
this.scheduledExecutorService =
new ScheduledThreadPoolExecutor(
1, new
ThreadFactoryBuilder().setNameFormat("stream-load-check").build());
- this.serializer = createSerializer(dorisConfig, seaTunnelRowType);
+ this.serializer = createSerializer(dorisConfig,
catalogTable.getSeaTunnelRowType());
this.intervalTime = dorisConfig.getCheckInterval();
this.loading = false;
this.initializeLoad();
@@ -101,7 +114,11 @@ public class DorisSinkWriter implements
SinkWriter<SeaTunnelRow, DorisCommitInfo
try {
this.dorisStreamLoad =
new DorisStreamLoad(
- backend, dorisConfig, labelGenerator, new
HttpUtil().getHttpClient());
+ backend,
+ catalogTable.getTablePath(),
+ dorisConfig,
+ labelGenerator,
+ new HttpUtil().getHttpClient());
if (dorisConfig.getEnable2PC()) {
dorisStreamLoad.abortPreCommit(labelPrefix, lastCheckpointId +
1);
}
@@ -120,7 +137,11 @@ public class DorisSinkWriter implements
SinkWriter<SeaTunnelRow, DorisCommitInfo
@Override
public void write(SeaTunnelRow element) throws IOException {
checkLoadException();
- byte[] serialize = serializer.serialize(element);
+ byte[] serialize =
+ serializer.serialize(
+ dorisConfig.isNeedsUnsupportedTypeCasting()
+ ?
UnsupportedTypeConverterUtils.convertRow(element)
+ : element);
if (Objects.isNull(serialize)) {
return;
}
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
index 892c1da954..00f21e9ae5 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.connectors.doris.sink.writer;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.connectors.doris.config.DorisConfig;
import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode;
import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
@@ -82,13 +83,13 @@ public class DorisStreamLoad implements Serializable {
public DorisStreamLoad(
String hostPort,
+ TablePath tablePath,
DorisConfig dorisConfig,
LabelGenerator labelGenerator,
CloseableHttpClient httpClient) {
this.hostPort = hostPort;
- String[] tableInfo = dorisConfig.getTableIdentifier().split("\\.");
- this.db = tableInfo[0];
- this.table = tableInfo[1];
+ this.db = tablePath.getDatabaseName();
+ this.table = tablePath.getTableName();
this.user = dorisConfig.getUsername();
this.passwd = dorisConfig.getPassword();
this.labelGenerator = labelGenerator;
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java
index fc1884efcf..3f890af855 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
@@ -34,11 +35,14 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
+import static com.google.common.base.Preconditions.checkNotNull;
+
public class DorisCatalogUtil {
public static final String ALL_DATABASES_QUERY =
@@ -109,10 +113,22 @@ public class DorisCatalogUtil {
.map(r -> "`" + r + "`")
.collect(Collectors.joining(","));
}
+ String uniqueKey = "";
+ if (!tableSchema.getConstraintKeys().isEmpty()) {
+ uniqueKey =
+ tableSchema.getConstraintKeys().stream()
+ .flatMap(c -> c.getColumnNames().stream())
+ .map(r -> "`" + r.getColumnName() + "`")
+ .collect(Collectors.joining(","));
+ }
template =
template.replaceAll(
String.format("\\$\\{%s\\}",
SaveModeConstants.ROWTYPE_PRIMARY_KEY),
primaryKey);
+ template =
+ template.replaceAll(
+ String.format("\\$\\{%s\\}",
SaveModeConstants.ROWTYPE_UNIQUE_KEY),
+ uniqueKey);
Map<String, CreateTableParser.ColumnInfo> columnInTemplate =
CreateTableParser.getColumnList(template);
template = mergeColumnInTemplate(columnInTemplate, tableSchema,
template);
@@ -120,15 +136,7 @@ public class DorisCatalogUtil {
String rowTypeFields =
tableSchema.getColumns().stream()
.filter(column ->
!columnInTemplate.containsKey(column.getName()))
- .map(
- column ->
- String.format(
- "`%s` %s %s ",
- column.getName(),
- fromSeaTunnelType(
- column.getDataType(),
-
column.getColumnLength()),
- column.isNullable() ? "NULL" :
"NOT NULL"))
+ .map(DorisCatalogUtil::columnToDorisType)
.collect(Collectors.joining(",\n"));
return template.replaceAll(
String.format("\\$\\{%s\\}",
SaveModeConstants.DATABASE),
@@ -149,18 +157,18 @@ public class DorisCatalogUtil {
Map<String, Column> columnMap =
tableSchema.getColumns().stream()
.collect(Collectors.toMap(Column::getName,
Function.identity()));
- for (String col : columnInTemplate.keySet()) {
- CreateTableParser.ColumnInfo columnInfo =
columnInTemplate.get(col);
+ List<CreateTableParser.ColumnInfo> columnInfosInSeq =
+ columnInTemplate.values().stream()
+ .sorted(
+ Comparator.comparingInt(
+
CreateTableParser.ColumnInfo::getStartIndex))
+ .collect(Collectors.toList());
+ for (CreateTableParser.ColumnInfo columnInfo : columnInfosInSeq) {
+ String col = columnInfo.getName();
if (StringUtils.isEmpty(columnInfo.getInfo())) {
if (columnMap.containsKey(col)) {
Column column = columnMap.get(col);
- String newCol =
- String.format(
- "`%s` %s %s ",
- column.getName(),
- fromSeaTunnelType(
- column.getDataType(),
column.getColumnLength()),
- column.isNullable() ? "NULL" : "NOT NULL");
+ String newCol = columnToDorisType(column);
String prefix = template.substring(0,
columnInfo.getStartIndex() + offset);
String suffix = template.substring(offset +
columnInfo.getEndIndex());
if (prefix.endsWith("`")) {
@@ -181,6 +189,21 @@ public class DorisCatalogUtil {
return template;
}
+ private static String columnToDorisType(Column column) {
+ checkNotNull(column, "The column is required.");
+ return String.format(
+ "`%s` %s %s ",
+ column.getName(),
+ fromSeaTunnelType(
+ column.getDataType(),
+ Math.max(
+ column.getColumnLength() == null ? 0 :
column.getColumnLength(),
+ column.getLongColumnLength() == null
+ ? 0
+ : column.getLongColumnLength())),
+ column.isNullable() ? "NULL" : "NOT NULL");
+ }
+
public static SeaTunnelDataType<?> fromDorisType(ResultSet rs) throws
SQLException {
String type = rs.getString(5).toUpperCase();
@@ -214,6 +237,7 @@ public class DorisCatalogUtil {
return LocalTimeType.LOCAL_DATE_TIME_TYPE;
case "DECIMAL":
case "DECIMALV2":
+ case "DECIMALV3":
int precision = rs.getInt(8);
int scale = rs.getInt(9);
return new DecimalType(precision, scale);
@@ -233,14 +257,16 @@ public class DorisCatalogUtil {
}
}
- private static String fromSeaTunnelType(SeaTunnelDataType<?> dataType,
Integer columnLength) {
+ private static String fromSeaTunnelType(SeaTunnelDataType<?> dataType,
Long columnLength) {
switch (dataType.getSqlType()) {
case STRING:
- if (columnLength != null && columnLength > 65533) {
- return "STRING";
+ if (columnLength != null && columnLength <= 65533 &&
columnLength > 0) {
+ return String.format("VARCHAR(%d)", columnLength);
}
- return String.format("VARCHAR(%d)", columnLength);
+ return "STRING";
+ case BYTES:
+ return "STRING";
case NULL:
return "NULL_TYPE";
case BOOLEAN:
@@ -260,13 +286,21 @@ public class DorisCatalogUtil {
return String.format(
"DECIMALV3(%d,%d)", decimalType.getPrecision(),
decimalType.getScale());
case TIME:
- return "TIME";
+ return "VARCHAR(8)";
case DATE:
return "DATEV2";
case TIMESTAMP:
return "DATETIME";
+ case ARRAY:
+ return "ARRAY<"
+ + fromSeaTunnelType(
+ ((ArrayType<?, ?>) dataType).getElementType(),
Long.MAX_VALUE)
+ + ">";
+ case MAP:
case ROW:
return "JSONB";
+ case TINYINT:
+ return "TINYINT";
default:
throw new CatalogException(String.format("Unsupported doris
type: %s", dataType));
}
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/IOUtils.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/IOUtils.java
deleted file mode 100644
index 3e914d7d2a..0000000000
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/IOUtils.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.doris.util;
-
-import java.io.IOException;
-import java.io.StringReader;
-import java.io.StringWriter;
-import java.util.Properties;
-
-public class IOUtils {
- public static String propsToString(Properties props) throws
IllegalArgumentException {
- StringWriter sw = new StringWriter();
- if (props != null) {
- try {
- props.store(sw, "");
- } catch (IOException ex) {
- throw new IllegalArgumentException("Cannot parse props to
String.", ex);
- }
- }
- return sw.toString();
- }
-
- public static Properties propsFromString(String source) throws
IllegalArgumentException {
- Properties copy = new Properties();
- if (source != null) {
- try {
- copy.load(new StringReader(source));
- } catch (IOException ex) {
- throw new IllegalArgumentException("Cannot parse props from
String.", ex);
- }
- }
- return copy;
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/UnsupportedTypeConverterUtils.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/UnsupportedTypeConverterUtils.java
new file mode 100644
index 0000000000..bcf0ed534f
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/UnsupportedTypeConverterUtils.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.doris.util;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SqlType;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.seatunnel.api.table.type.BasicType.DOUBLE_TYPE;
+
+public class UnsupportedTypeConverterUtils {
+ public static Object convertBigDecimal(BigDecimal bigDecimal) {
+ if (bigDecimal.precision() > 38) {
+ return bigDecimal.doubleValue();
+ }
+ return bigDecimal;
+ }
+
+ public static SeaTunnelRow convertRow(SeaTunnelRow row) {
+ List<Object> newValues =
+ Arrays.stream(row.getFields())
+ .map(
+ value -> {
+ if (value instanceof BigDecimal) {
+ return convertBigDecimal((BigDecimal)
value);
+ }
+ return value;
+ })
+ .collect(Collectors.toList());
+ return new SeaTunnelRow(newValues.toArray());
+ }
+
+ public static CatalogTable convertCatalogTable(CatalogTable catalogTable) {
+ TableSchema tableSchema = catalogTable.getTableSchema();
+ List<Column> columns = tableSchema.getColumns();
+ List<Column> newColumns =
+ columns.stream()
+ .map(
+ column -> {
+ if
(column.getDataType().getSqlType().equals(SqlType.DECIMAL)) {
+ DecimalType decimalType =
+ (DecimalType)
column.getDataType();
+ if (decimalType.getPrecision() > 38) {
+ return PhysicalColumn.of(
+ column.getName(),
+ DOUBLE_TYPE,
+ 22,
+ column.isNullable(),
+ null,
+ column.getComment(),
+ "DOUBLE",
+ false,
+ false,
+ 0L,
+ column.getOptions(),
+ 22L);
+ }
+ }
+ return column;
+ })
+ .collect(Collectors.toList());
+ TableSchema newtableSchema =
+ TableSchema.builder()
+ .columns(newColumns)
+ .primaryKey(tableSchema.getPrimaryKey())
+ .constraintKey(tableSchema.getConstraintKeys())
+ .build();
+
+ return CatalogTable.of(
+ catalogTable.getTableId(),
+ newtableSchema,
+ catalogTable.getOptions(),
+ catalogTable.getPartitionKeys(),
+ catalogTable.getComment(),
+ catalogTable.getCatalogName());
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/DorisCreateTableTest.java
b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/DorisCreateTableTest.java
new file mode 100644
index 0000000000..404351090e
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/DorisCreateTableTest.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.doris.catalog;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.connectors.doris.util.DorisCatalogUtil;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+@Slf4j
+public class DorisCreateTableTest {
+
+ @Test
+ public void test() {
+
+ List<Column> columns = new ArrayList<>();
+
+ columns.add(PhysicalColumn.of("id", BasicType.LONG_TYPE, null, true,
null, ""));
+ columns.add(PhysicalColumn.of("name", BasicType.STRING_TYPE, null,
true, null, ""));
+ columns.add(PhysicalColumn.of("age", BasicType.INT_TYPE, null, true,
null, ""));
+ columns.add(PhysicalColumn.of("score", BasicType.INT_TYPE, null, true,
null, ""));
+ columns.add(PhysicalColumn.of("gender", BasicType.BYTE_TYPE, null,
true, null, ""));
+ columns.add(PhysicalColumn.of("create_time", BasicType.LONG_TYPE,
null, true, null, ""));
+
+ String result =
+ DorisCatalogUtil.getCreateTableStatement(
+ "CREATE TABLE IF NOT EXISTS
`${database}`.`${table_name}` (
\n"
+ + "${rowtype_primary_key} , \n"
+ + "${rowtype_unique_key} , \n"
+ + "`create_time` DATETIME NOT NULL , \n"
+ + "${rowtype_fields} \n"
+ + ") ENGINE=OLAP \n"
+ + "PRIMARY
KEY(${rowtype_primary_key},`create_time`) \n"
+ + "PARTITION BY RANGE (`create_time`)( \n"
+ + " PARTITION p20230329 VALUES LESS THAN
(\"2023-03-29\")
\n"
+ + ") \n"
+ + "DISTRIBUTED BY HASH
(${rowtype_primary_key}) \n"
+ + "PROPERTIES (\n"
+ + "\"replication_allocation\" =
\"tag.location.default: 1\",\n"
+ + "\"in_memory\" = \"false\",\n"
+ + "\"storage_format\" = \"V2\",\n"
+ + "\"disable_auto_compaction\" = \"false\"\n"
+ + ")",
+ TablePath.of("test1.test2"),
+ CatalogTable.of(
+ TableIdentifier.of("test", "test1", "test2"),
+ TableSchema.builder()
+ .primaryKey(PrimaryKey.of("",
Arrays.asList("id", "age")))
+ .constraintKey(
+ Arrays.asList(
+ ConstraintKey.of(
+
ConstraintKey.ConstraintType
+
.UNIQUE_KEY,
+ "unique_key",
+
Collections.singletonList(
+
ConstraintKey
+
.ConstraintKeyColumn
+
.of(
+
"name",
+
ConstraintKey
+
.ColumnSortType
+
.DESC))),
+ ConstraintKey.of(
+
ConstraintKey.ConstraintType
+
.UNIQUE_KEY,
+ "unique_key2",
+
Collections.singletonList(
+
ConstraintKey
+
.ConstraintKeyColumn
+
.of(
+
"score",
+
ConstraintKey
+
.ColumnSortType
+
.ASC)))))
+ .columns(columns)
+ .build(),
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ ""));
+ Assertions.assertEquals(
+ result,
+ "CREATE TABLE IF NOT EXISTS `test1`.`test2` (
\n"
+ + "`id` BIGINT(1) NULL ,`age` INT(1) NULL , \n"
+ + "`name` STRING NULL ,`score` INT(1) NULL , \n"
+ + "`create_time` DATETIME NOT NULL , \n"
+ + "`gender` TINYINT NULL \n"
+ + ") ENGINE=OLAP \n"
+ + "PRIMARY KEY(`id`,`age`,`create_time`) \n"
+ + "PARTITION BY RANGE (`create_time`)( \n"
+ + " PARTITION p20230329 VALUES LESS THAN
(\"2023-03-29\")
\n"
+ + ") \n"
+ + "DISTRIBUTED BY HASH (`id`,`age`) \n"
+ + "PROPERTIES (\n"
+ + "\"replication_allocation\" =
\"tag.location.default: 1\",\n"
+ + "\"in_memory\" = \"false\",\n"
+ + "\"storage_format\" = \"V2\",\n"
+ + "\"disable_auto_compaction\" = \"false\"\n"
+ + ")");
+ }
+
+ @Test
+ public void testInSeq() {
+
+ List<Column> columns = new ArrayList<>();
+
+ columns.add(PhysicalColumn.of("L_ORDERKEY", BasicType.INT_TYPE, null,
false, null, ""));
+ columns.add(PhysicalColumn.of("L_PARTKEY", BasicType.INT_TYPE, null,
false, null, ""));
+ columns.add(PhysicalColumn.of("L_SUPPKEY", BasicType.INT_TYPE, null,
false, null, ""));
+ columns.add(PhysicalColumn.of("L_LINENUMBER", BasicType.INT_TYPE,
null, false, null, ""));
+ columns.add(PhysicalColumn.of("L_QUANTITY", new DecimalType(15, 2),
null, false, null, ""));
+ columns.add(
+ PhysicalColumn.of(
+ "L_EXTENDEDPRICE", new DecimalType(15, 2), null,
false, null, ""));
+ columns.add(PhysicalColumn.of("L_DISCOUNT", new DecimalType(15, 2),
null, false, null, ""));
+ columns.add(PhysicalColumn.of("L_TAX", new DecimalType(15, 2), null,
false, null, ""));
+ columns.add(
+ PhysicalColumn.of("L_RETURNFLAG", BasicType.STRING_TYPE, null,
false, null, ""));
+ columns.add(
+ PhysicalColumn.of("L_LINESTATUS", BasicType.STRING_TYPE, null,
false, null, ""));
+ columns.add(
+ PhysicalColumn.of(
+ "L_SHIPDATE", LocalTimeType.LOCAL_DATE_TYPE, null,
false, null, ""));
+ columns.add(
+ PhysicalColumn.of(
+ "L_COMMITDATE", LocalTimeType.LOCAL_DATE_TYPE, null,
false, null, ""));
+ columns.add(
+ PhysicalColumn.of(
+ "L_RECEIPTDATE", LocalTimeType.LOCAL_DATE_TYPE, null,
false, null, ""));
+ columns.add(
+ PhysicalColumn.of("L_SHIPINSTRUCT", BasicType.STRING_TYPE,
null, false, null, ""));
+ columns.add(PhysicalColumn.of("L_SHIPMODE", BasicType.STRING_TYPE,
null, false, null, ""));
+ columns.add(PhysicalColumn.of("L_COMMENT", BasicType.STRING_TYPE,
null, false, null, ""));
+
+ String result =
+ DorisCatalogUtil.getCreateTableStatement(
+ "CREATE TABLE IF NOT EXISTS
`${database}`.`${table_name}` (\n"
+ + "`L_COMMITDATE`,\n"
+ + "${rowtype_primary_key},\n"
+ + "L_SUPPKEY BIGINT NOT NULL,\n"
+ + "${rowtype_fields}\n"
+ + ") ENGINE=OLAP\n"
+ + " PRIMARY KEY (L_COMMITDATE,
${rowtype_primary_key}, L_SUPPKEY)\n"
+ + "DISTRIBUTED BY HASH
(${rowtype_primary_key})"
+ + "PROPERTIES (\n"
+ + " \"replication_num\" = \"1\" \n"
+ + ")",
+ TablePath.of("tpch", "lineitem"),
+ CatalogTable.of(
+ TableIdentifier.of("test", "tpch", "lineitem"),
+ TableSchema.builder()
+ .primaryKey(
+ PrimaryKey.of(
+ "",
+ Arrays.asList(
+ "L_ORDERKEY",
"L_LINENUMBER")))
+ .columns(columns)
+ .build(),
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ ""));
+ String expected =
+ "CREATE TABLE IF NOT EXISTS `tpch`.`lineitem` (\n"
+ + "`L_COMMITDATE` DATEV2 NOT NULL ,\n"
+ + "`L_ORDERKEY` INT(1) NOT NULL ,`L_LINENUMBER` INT(1)
NOT NULL ,\n"
+ + "L_SUPPKEY BIGINT NOT NULL,\n"
+ + "`L_PARTKEY` INT(1) NOT NULL ,\n"
+ + "`L_QUANTITY` DECIMALV3(15,2) NOT NULL ,\n"
+ + "`L_EXTENDEDPRICE` DECIMALV3(15,2) NOT NULL ,\n"
+ + "`L_DISCOUNT` DECIMALV3(15,2) NOT NULL ,\n"
+ + "`L_TAX` DECIMALV3(15,2) NOT NULL ,\n"
+ + "`L_RETURNFLAG` STRING NOT NULL ,\n"
+ + "`L_LINESTATUS` STRING NOT NULL ,\n"
+ + "`L_SHIPDATE` DATEV2 NOT NULL ,\n"
+ + "`L_RECEIPTDATE` DATEV2 NOT NULL ,\n"
+ + "`L_SHIPINSTRUCT` STRING NOT NULL ,\n"
+ + "`L_SHIPMODE` STRING NOT NULL ,\n"
+ + "`L_COMMENT` STRING NOT NULL \n"
+ + ") ENGINE=OLAP\n"
+ + " PRIMARY KEY (L_COMMITDATE,
`L_ORDERKEY`,`L_LINENUMBER`, L_SUPPKEY)\n"
+ + "DISTRIBUTED BY HASH
(`L_ORDERKEY`,`L_LINENUMBER`)PROPERTIES (\n"
+ + " \"replication_num\" = \"1\" \n"
+ + ")";
+ Assertions.assertEquals(result, expected);
+ }
+
+ @Test
+ public void testWithVarchar() {
+
+ List<Column> columns = new ArrayList<>();
+
+ columns.add(PhysicalColumn.of("id", BasicType.LONG_TYPE, null, true,
null, ""));
+ columns.add(PhysicalColumn.of("name", BasicType.STRING_TYPE, null,
true, null, ""));
+ columns.add(PhysicalColumn.of("age", BasicType.INT_TYPE, null, true,
null, ""));
+ columns.add(PhysicalColumn.of("comment", BasicType.STRING_TYPE, 500,
true, null, ""));
+ columns.add(PhysicalColumn.of("description", BasicType.STRING_TYPE,
70000, true, null, ""));
+
+ String result =
+ DorisCatalogUtil.getCreateTableStatement(
+ "CREATE TABLE IF NOT EXISTS
`${database}`.`${table_name}` (
\n"
+ + "${rowtype_primary_key} , \n"
+ + "`create_time` DATETIME NOT NULL , \n"
+ + "${rowtype_fields} \n"
+ + ") ENGINE=OLAP \n"
+ + "PRIMARY
KEY(${rowtype_primary_key},`create_time`) \n"
+ + "PARTITION BY RANGE (`create_time`)( \n"
+ + " PARTITION p20230329 VALUES LESS THAN
(\"2023-03-29\")
\n"
+ + ") \n"
+ + "DISTRIBUTED BY HASH
(${rowtype_primary_key}) \n"
+ + "PROPERTIES ( \n"
+ + " \"dynamic_partition.enable\" =
\"true\",
\n"
+ + " \"dynamic_partition.time_unit\" =
\"DAY\",
\n"
+ + " \"dynamic_partition.end\" = \"3\", \n"
+ + " \"dynamic_partition.prefix\" = \"p\"
\n"
+ + ");",
+ TablePath.of("test1", "test2"),
+ CatalogTable.of(
+ TableIdentifier.of("test", "test1", "test2"),
+ TableSchema.builder()
+ .primaryKey(PrimaryKey.of("",
Arrays.asList("id", "age")))
+ .columns(columns)
+ .build(),
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ ""));
+
+ Assertions.assertEquals(
+ result,
+ "CREATE TABLE IF NOT EXISTS `test1`.`test2` (
\n"
+ + "`id` BIGINT(1) NULL ,`age` INT(1) NULL , \n"
+ + "`create_time` DATETIME NOT NULL , \n"
+ + "`name` STRING NULL ,\n"
+ + "`comment` VARCHAR(500) NULL ,\n"
+ + "`description` STRING NULL \n"
+ + ") ENGINE=OLAP \n"
+ + "PRIMARY KEY(`id`,`age`,`create_time`) \n"
+ + "PARTITION BY RANGE (`create_time`)( \n"
+ + " PARTITION p20230329 VALUES LESS THAN
(\"2023-03-29\")
\n"
+ + ") \n"
+ + "DISTRIBUTED BY HASH (`id`,`age`) \n"
+ + "PROPERTIES ( \n"
+ + " \"dynamic_partition.enable\" = \"true\",
\n"
+ + " \"dynamic_partition.time_unit\" = \"DAY\",
\n"
+ + " \"dynamic_partition.end\" = \"3\", \n"
+ + " \"dynamic_partition.prefix\" = \"p\"
\n"
+ + ");");
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java
index 4722b84025..275c714686 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java
@@ -18,17 +18,21 @@
package org.apache.seatunnel.e2e.connector.doris;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.connectors.doris.catalog.DorisCatalog;
import org.apache.seatunnel.connectors.doris.catalog.DorisCatalogFactory;
import org.apache.seatunnel.connectors.doris.config.DorisOptions;
+import org.apache.seatunnel.connectors.doris.sink.DorisSinkFactory;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
@@ -42,12 +46,32 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
@Slf4j
public class DorisCatalogIT extends AbstractDorisIT {
private static final String DATABASE = "test";
private static final String SINK_TABLE = "doris_catalog_e2e";
+ private static final TablePath tablePath = TablePath.of(DATABASE,
SINK_TABLE);
+ private static final CatalogTable catalogTable;
+
+ static {
+ TableSchema.Builder builder = TableSchema.builder();
+ builder.column(PhysicalColumn.of("k1", BasicType.INT_TYPE, 10, false,
0, "k1"));
+ builder.column(PhysicalColumn.of("k2", BasicType.STRING_TYPE, 64,
false, "", "k2"));
+ builder.column(PhysicalColumn.of("v1", BasicType.DOUBLE_TYPE, 10,
true, null, "v1"));
+ builder.column(PhysicalColumn.of("v2", new DecimalType(10, 2), 0,
false, 0.1, "v2"));
+ builder.primaryKey(PrimaryKey.of("pk", Arrays.asList("k1", "k2")));
+ catalogTable =
+ CatalogTable.of(
+ TableIdentifier.of("doris", tablePath),
+ builder.build(),
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ "test");
+ }
+
private DorisCatalogFactory factory;
private DorisCatalog catalog;
@@ -74,7 +98,6 @@ public class DorisCatalogIT extends AbstractDorisIT {
map.put(DorisOptions.QUERY_PORT.key(), QUERY_PORT);
map.put(DorisOptions.USERNAME.key(), USERNAME);
map.put(DorisOptions.PASSWORD.key(), PASSWORD);
- map.put(DorisOptions.DEFAULT_DATABASE.key(), PASSWORD);
catalog = (DorisCatalog) factory.createCatalog(catalogName,
ReadonlyConfig.fromMap(map));
@@ -98,26 +121,10 @@ public class DorisCatalogIT extends AbstractDorisIT {
return;
}
- TablePath tablePath = TablePath.of(DATABASE, SINK_TABLE);
-
- TableSchema.Builder builder = TableSchema.builder();
- builder.column(PhysicalColumn.of("k1", BasicType.INT_TYPE, 10, false,
0, "k1"));
- builder.column(PhysicalColumn.of("k2", BasicType.STRING_TYPE, 64,
false, "", "k2"));
- builder.column(PhysicalColumn.of("v1", BasicType.DOUBLE_TYPE, 10,
true, null, "v1"));
- builder.column(PhysicalColumn.of("v2", new DecimalType(10, 2), 0,
false, 0.1, "v2"));
- builder.primaryKey(PrimaryKey.of("pk", Arrays.asList("k1", "k2")));
- CatalogTable catalogTable =
- CatalogTable.of(
- TableIdentifier.of("doris", DATABASE, SINK_TABLE),
- builder.build(),
- Collections.emptyMap(),
- Collections.emptyList(),
- "test");
-
boolean dbCreated = false;
List<String> databases = catalog.listDatabases();
- Assertions.assertEquals(databases.size(), 1);
+ Assertions.assertFalse(databases.isEmpty());
if (!catalog.databaseExists(tablePath.getDatabaseName())) {
catalog.createDatabase(tablePath, false);
@@ -129,7 +136,7 @@ public class DorisCatalogIT extends AbstractDorisIT {
Assertions.assertTrue(catalog.tableExists(tablePath));
List<String> tables = catalog.listTables(tablePath.getDatabaseName());
- Assertions.assertEquals(tables.size(), 1);
+ Assertions.assertFalse(tables.isEmpty());
catalog.dropTable(tablePath, false);
Assertions.assertFalse(catalog.tableExists(tablePath));
@@ -140,6 +147,113 @@ public class DorisCatalogIT extends AbstractDorisIT {
}
}
+ @Test
+ void testSaveMode() {
+ CatalogTable upstreamTable =
+ CatalogTable.of(
+ TableIdentifier.of("doris",
TablePath.of("test.test")), catalogTable);
+ ReadonlyConfig config =
+ ReadonlyConfig.fromMap(
+ new HashMap<String, Object>() {
+ {
+ put(
+ DorisOptions.FENODES.key(),
+ container.getHost() + ":" + HTTP_PORT);
+ put(DorisOptions.USERNAME.key(), USERNAME);
+ put(DorisOptions.PASSWORD.key(), PASSWORD);
+ }
+ });
+ assertCreateTable(upstreamTable, config, "test.test");
+
+ ReadonlyConfig config2 =
+ ReadonlyConfig.fromMap(
+ new HashMap<String, Object>() {
+ {
+ put(
+ DorisOptions.FENODES.key(),
+ container.getHost() + ":" + HTTP_PORT);
+ put(DorisOptions.DATABASE.key(), "test2");
+ put(DorisOptions.TABLE.key(), "test2");
+ put(DorisOptions.USERNAME.key(), USERNAME);
+ put(DorisOptions.PASSWORD.key(), PASSWORD);
+ }
+ });
+ assertCreateTable(upstreamTable, config2, "test2.test2");
+
+ ReadonlyConfig config3 =
+ ReadonlyConfig.fromMap(
+ new HashMap<String, Object>() {
+ {
+ put(
+ DorisOptions.FENODES.key(),
+ container.getHost() + ":" + HTTP_PORT);
+ put(DorisOptions.TABLE_IDENTIFIER.key(),
"test3.test3");
+ put(DorisOptions.USERNAME.key(), USERNAME);
+ put(DorisOptions.PASSWORD.key(), PASSWORD);
+ }
+ });
+ assertCreateTable(upstreamTable, config3, "test3.test3");
+
+ ReadonlyConfig config4 =
+ ReadonlyConfig.fromMap(
+ new HashMap<String, Object>() {
+ {
+ put(
+ DorisOptions.FENODES.key(),
+ container.getHost() + ":" + HTTP_PORT);
+ put(DorisOptions.DATABASE.key(), "test5");
+ put(DorisOptions.TABLE.key(), "${table_name}");
+ put(DorisOptions.USERNAME.key(), USERNAME);
+ put(DorisOptions.PASSWORD.key(), PASSWORD);
+ }
+ });
+ assertCreateTable(upstreamTable, config4, "test5.test");
+
+ ReadonlyConfig config5 =
+ ReadonlyConfig.fromMap(
+ new HashMap<String, Object>() {
+ {
+ put(
+ DorisOptions.FENODES.key(),
+ container.getHost() + ":" + HTTP_PORT);
+ put(DorisOptions.DATABASE.key(), "test4");
+ put(DorisOptions.TABLE.key(), "test4");
+ put(DorisOptions.USERNAME.key(), USERNAME);
+ put(DorisOptions.PASSWORD.key(), PASSWORD);
+
put(DorisOptions.NEEDS_UNSUPPORTED_TYPE_CASTING.key(), true);
+ }
+ });
+ upstreamTable
+ .getTableSchema()
+ .getColumns()
+ .add(PhysicalColumn.of("v3", new DecimalType(66, 22), 66,
false, null, "v3"));
+ CatalogTable newTable = assertCreateTable(upstreamTable, config5,
"test4.test4");
+ Assertions.assertEquals(
+ BasicType.DOUBLE_TYPE,
newTable.getTableSchema().getColumns().get(4).getDataType());
+ }
+
+ private CatalogTable assertCreateTable(
+ CatalogTable upstreamTable, ReadonlyConfig config, String
fullName) {
+ DorisSinkFactory dorisSinkFactory = new DorisSinkFactory();
+ TableSinkFactoryContext context =
+ new TableSinkFactoryContext(
+ upstreamTable, config,
Thread.currentThread().getContextClassLoader());
+ SupportSaveMode sink = (SupportSaveMode)
dorisSinkFactory.createSink(context).createSink();
+ sink.getSaveModeHandler().get().handleSaveMode();
+ CatalogTable createdTable = catalog.getTable(TablePath.of(fullName));
+ Assertions.assertEquals(
+ upstreamTable.getTableSchema().getColumns().size(),
+ createdTable.getTableSchema().getColumns().size());
+ Assertions.assertIterableEquals(
+ upstreamTable.getTableSchema().getColumns().stream()
+ .map(Column::getName)
+ .collect(Collectors.toList()),
+ createdTable.getTableSchema().getColumns().stream()
+ .map(Column::getName)
+ .collect(Collectors.toList()));
+ return createdTable;
+ }
+
@AfterAll
public void close() {
if (catalog != null) {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/write-cdc-changelog-to-doris.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/write-cdc-changelog-to-doris.conf
index a4b2fbbd74..d4d4e69f9d 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/write-cdc-changelog-to-doris.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/write-cdc-changelog-to-doris.conf
@@ -36,7 +36,8 @@ sink {
fenodes = "10.16.10.14:8234"
username = root
password = ""
- table.identifier = "test.e2e_table_sink"
+ database = "test"
+ table = "e2e_table_sink"
sink.label-prefix = "test-cdc"
sink.enable-2pc = "false"
sink.enable-delete = "true"
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/resources/doris-jdbc-to-doris.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/resources/doris-jdbc-to-doris.conf
index f656130239..4c717c14c8 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/resources/doris-jdbc-to-doris.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/resources/doris-jdbc-to-doris.conf
@@ -38,7 +38,8 @@ sink {
fenodes = "doris_e2e:8030"
username = root
password = ""
- table.identifier = "test.e2e_table_sink"
+ database = "test"
+ table = "e2e_table_sink"
sink.enable-2pc = "false"
sink.label-prefix = "test_doris"
doris.config = {