This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 268d76cbf3 [Improve] doris options (#8745)
268d76cbf3 is described below
commit 268d76cbf3961d7aa72d7892ee293b7924131d1d
Author: Jarvis <[email protected]>
AuthorDate: Tue Feb 25 21:50:12 2025 +0800
[Improve] doris options (#8745)
---
.../connectors/doris/catalog/DorisCatalog.java | 8 +--
.../doris/catalog/DorisCatalogFactory.java | 20 ++++--
.../{DorisOptions.java => DorisBaseOptions.java} | 33 +++------
.../connectors/doris/config/DorisSinkConfig.java | 14 ++--
.../connectors/doris/config/DorisSinkOptions.java | 82 +++++++---------------
.../connectors/doris/config/DorisSourceConfig.java | 8 +--
.../doris/config/DorisSourceOptions.java | 43 ++++++------
.../connectors/doris/config/DorisTableConfig.java | 6 +-
.../doris/datatype/AbstractDorisTypeConverter.java | 2 +-
.../doris/datatype/DorisTypeConverterV1.java | 2 +-
.../doris/datatype/DorisTypeConverterV2.java | 2 +-
.../connectors/doris/sink/DorisSinkFactory.java | 43 ++++++++++--
.../doris/source/DorisSourceFactory.java | 42 +++++------
.../doris/source/reader/DorisValueReader.java | 4 +-
.../e2e/connector/doris/DorisCatalogIT.java | 74 ++++++++++---------
15 files changed, 191 insertions(+), 192 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java
index ebc8f93785..0de8e352c4 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java
@@ -39,7 +39,7 @@ import org.apache.seatunnel.api.table.converter.TypeConverter;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
-import org.apache.seatunnel.connectors.doris.config.DorisOptions;
+import org.apache.seatunnel.connectors.doris.config.DorisBaseOptions;
import
org.apache.seatunnel.connectors.doris.datatype.DorisTypeConverterFactory;
import org.apache.seatunnel.connectors.doris.datatype.DorisTypeConverterV2;
import org.apache.seatunnel.connectors.doris.util.DorisCatalogUtil;
@@ -463,9 +463,9 @@ public class DorisCatalog implements Catalog {
private Map<String, String> connectorOptions() {
Map<String, String> options = new HashMap<>();
options.put("connector", "doris");
- options.put(DorisOptions.FENODES.key(), String.join(",",
frontEndNodes));
- options.put(DorisOptions.USERNAME.key(), username);
- options.put(DorisOptions.PASSWORD.key(), password);
+ options.put(DorisBaseOptions.FENODES.key(), String.join(",",
frontEndNodes));
+ options.put(DorisBaseOptions.USERNAME.key(), username);
+ options.put(DorisBaseOptions.PASSWORD.key(), password);
return options;
}
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalogFactory.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalogFactory.java
index 7fd1da603e..3d4d612825 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalogFactory.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalogFactory.java
@@ -22,12 +22,12 @@ import
org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.connectors.doris.config.DorisOptions;
+import org.apache.seatunnel.connectors.doris.config.DorisBaseOptions;
import org.apache.seatunnel.connectors.doris.config.DorisSinkOptions;
import com.google.auto.service.AutoService;
-import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.IDENTIFIER;
+import static
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.IDENTIFIER;
import static
org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.SAVE_MODE_CREATE_TEMPLATE;
@AutoService(Factory.class)
@@ -37,10 +37,10 @@ public class DorisCatalogFactory implements CatalogFactory {
public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
return new DorisCatalog(
catalogName,
- options.get(DorisOptions.FENODES),
- options.get(DorisOptions.QUERY_PORT),
- options.get(DorisOptions.USERNAME),
- options.get(DorisOptions.PASSWORD),
+ options.get(DorisBaseOptions.FENODES),
+ options.get(DorisBaseOptions.QUERY_PORT),
+ options.get(DorisBaseOptions.USERNAME),
+ options.get(DorisBaseOptions.PASSWORD),
options.get(SAVE_MODE_CREATE_TEMPLATE),
options.get(DorisSinkOptions.DEFAULT_DATABASE));
}
@@ -52,6 +52,12 @@ public class DorisCatalogFactory implements CatalogFactory {
@Override
public OptionRule optionRule() {
- return DorisOptions.CATALOG_RULE.build();
+ return OptionRule.builder()
+ .required(
+ DorisBaseOptions.FENODES,
+ DorisBaseOptions.QUERY_PORT,
+ DorisBaseOptions.USERNAME,
+ DorisBaseOptions.PASSWORD)
+ .build();
}
}
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisBaseOptions.java
similarity index 70%
rename from
seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java
rename to
seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisBaseOptions.java
index bcdf24c9d7..2d5989dd2e 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisBaseOptions.java
@@ -19,58 +19,45 @@ package org.apache.seatunnel.connectors.doris.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.api.configuration.util.OptionRule;
-public interface DorisOptions {
+public class DorisBaseOptions {
- String IDENTIFIER = "Doris";
- String DORIS_DEFAULT_CLUSTER = "default_cluster";
- int DORIS_BATCH_SIZE_DEFAULT = 1024;
+ public static final String IDENTIFIER = "Doris";
// common option
- Option<String> FENODES =
+ public static final Option<String> FENODES =
Options.key("fenodes")
.stringType()
.noDefaultValue()
.withDescription("doris fe http address.");
- Option<Integer> QUERY_PORT =
+ public static final Option<Integer> QUERY_PORT =
Options.key("query-port")
.intType()
.defaultValue(9030)
.withDescription("doris query port");
- @Deprecated
- Option<String> TABLE_IDENTIFIER =
- Options.key("table.identifier")
- .stringType()
- .noDefaultValue()
- .withDescription("the doris table name.");
-
- Option<String> USERNAME =
+ public static final Option<String> USERNAME =
Options.key("username")
.stringType()
.noDefaultValue()
.withDescription("the doris user name.");
- Option<String> PASSWORD =
+ public static final Option<String> PASSWORD =
Options.key("password")
.stringType()
.noDefaultValue()
.withDescription("the doris password.");
- Option<String> TABLE =
+ public static final Option<String> TABLE =
Options.key("table").stringType().noDefaultValue().withDescription("table");
- Option<String> DATABASE =
+ public static final Option<String> DATABASE =
Options.key("database").stringType().noDefaultValue().withDescription("database");
- Option<Integer> DORIS_BATCH_SIZE =
+ public static final Option<Integer> DORIS_BATCH_SIZE =
Options.key("doris.batch.size")
.intType()
- .defaultValue(DORIS_BATCH_SIZE_DEFAULT)
+ .defaultValue(1024)
.withDescription("the batch size of the doris
read/write.");
-
- OptionRule.Builder CATALOG_RULE =
- OptionRule.builder().required(FENODES, QUERY_PORT, USERNAME,
PASSWORD);
}
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkConfig.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkConfig.java
index 8f0d948042..ee750e15c3 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkConfig.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkConfig.java
@@ -29,13 +29,13 @@ import java.io.Serializable;
import java.util.Map;
import java.util.Properties;
-import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.DATABASE;
-import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.DORIS_BATCH_SIZE;
-import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.FENODES;
-import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.PASSWORD;
-import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.QUERY_PORT;
-import static org.apache.seatunnel.connectors.doris.config.DorisOptions.TABLE;
-import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.USERNAME;
+import static
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.DATABASE;
+import static
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.DORIS_BATCH_SIZE;
+import static
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.FENODES;
+import static
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.PASSWORD;
+import static
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.QUERY_PORT;
+import static
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.TABLE;
+import static
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.USERNAME;
import static
org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.DORIS_SINK_CONFIG_PREFIX;
import static
org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.NEEDS_UNSUPPORTED_TYPE_CASTING;
import static
org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.SAVE_MODE_CREATE_TEMPLATE;
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkOptions.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkOptions.java
index 9884eb0554..28494eb40e 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkOptions.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkOptions.java
@@ -19,68 +19,59 @@ package org.apache.seatunnel.connectors.doris.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SaveModePlaceHolder;
import org.apache.seatunnel.api.sink.SchemaSaveMode;
import java.util.Map;
-import static
org.apache.seatunnel.api.options.SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA;
-import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.DATABASE;
-import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.DORIS_BATCH_SIZE;
-import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.FENODES;
-import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.PASSWORD;
-import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.QUERY_PORT;
-import static org.apache.seatunnel.connectors.doris.config.DorisOptions.TABLE;
-import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.TABLE_IDENTIFIER;
-import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.USERNAME;
+public class DorisSinkOptions extends DorisBaseOptions {
-public interface DorisSinkOptions {
-
- int DEFAULT_SINK_CHECK_INTERVAL = 10000;
- int DEFAULT_SINK_MAX_RETRIES = 3;
- int DEFAULT_SINK_BUFFER_SIZE = 256 * 1024;
- int DEFAULT_SINK_BUFFER_COUNT = 3;
+ @Deprecated
+ public static final Option<String> TABLE_IDENTIFIER =
+ Options.key("table.identifier")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("the doris table name.");
- Option<Boolean> SINK_ENABLE_2PC =
+ public static final Option<Boolean> SINK_ENABLE_2PC =
Options.key("sink.enable-2pc")
.booleanType()
.defaultValue(false)
.withDescription("enable 2PC while loading");
- Option<Integer> SINK_CHECK_INTERVAL =
+ public static final Option<Integer> SINK_CHECK_INTERVAL =
Options.key("sink.check-interval")
.intType()
- .defaultValue(DEFAULT_SINK_CHECK_INTERVAL)
+ .defaultValue(10000)
.withDescription("check exception with the interval while
loading");
- Option<Integer> SINK_MAX_RETRIES =
+ public static final Option<Integer> SINK_MAX_RETRIES =
Options.key("sink.max-retries")
.intType()
- .defaultValue(DEFAULT_SINK_MAX_RETRIES)
+ .defaultValue(3)
.withDescription("the max retry times if writing records
to database failed.");
- Option<Integer> SINK_BUFFER_SIZE =
+ public static final Option<Integer> SINK_BUFFER_SIZE =
Options.key("sink.buffer-size")
.intType()
- .defaultValue(DEFAULT_SINK_BUFFER_SIZE)
+ .defaultValue(256 * 1024)
.withDescription("the buffer size to cache data for stream
load.");
- Option<Integer> SINK_BUFFER_COUNT =
+ public static final Option<Integer> SINK_BUFFER_COUNT =
Options.key("sink.buffer-count")
.intType()
- .defaultValue(DEFAULT_SINK_BUFFER_COUNT)
+ .defaultValue(3)
.withDescription("the buffer count to cache data for
stream load.");
- Option<String> SINK_LABEL_PREFIX =
+ public static final Option<String> SINK_LABEL_PREFIX =
Options.key("sink.label-prefix")
.stringType()
.defaultValue("")
.withDescription("the unique label prefix.");
- Option<Boolean> SINK_ENABLE_DELETE =
+ public static final Option<Boolean> SINK_ENABLE_DELETE =
Options.key("sink.enable-delete")
.booleanType()
.defaultValue(false)
.withDescription("whether to enable the delete function");
- Option<Map<String, String>> DORIS_SINK_CONFIG_PREFIX =
+ public static final Option<Map<String, String>> DORIS_SINK_CONFIG_PREFIX =
Options.key("doris.config")
.mapType()
.noDefaultValue()
@@ -88,28 +79,28 @@ public interface DorisSinkOptions {
"The parameter of the Stream Load data_desc. "
+ "The way to specify the parameter is to
add the prefix `doris.config` to the original load parameter name ");
- Option<String> DEFAULT_DATABASE =
+ public static final Option<String> DEFAULT_DATABASE =
Options.key("default-database")
.stringType()
.defaultValue("information_schema")
.withDescription("");
- Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
+ public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
Options.key("schema_save_mode")
.enumType(SchemaSaveMode.class)
.defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST)
.withDescription("schema_save_mode");
- Option<DataSaveMode> DATA_SAVE_MODE =
+ public static final Option<DataSaveMode> DATA_SAVE_MODE =
Options.key("data_save_mode")
.enumType(DataSaveMode.class)
.defaultValue(DataSaveMode.APPEND_DATA)
.withDescription("data_save_mode");
- Option<String> CUSTOM_SQL =
+ public static final Option<String> CUSTOM_SQL =
Options.key("custom_sql").stringType().noDefaultValue().withDescription("custom_sql");
- Option<Boolean> NEEDS_UNSUPPORTED_TYPE_CASTING =
+ public static final Option<Boolean> NEEDS_UNSUPPORTED_TYPE_CASTING =
Options.key("needs_unsupported_type_casting")
.booleanType()
.defaultValue(false)
@@ -117,7 +108,7 @@ public interface DorisSinkOptions {
"Whether to enable the unsupported type casting,
such as Decimal64 to Double");
// create table
- Option<String> SAVE_MODE_CREATE_TEMPLATE =
+ public static final Option<String> SAVE_MODE_CREATE_TEMPLATE =
Options.key("save_mode_create_template")
.stringType()
.defaultValue(
@@ -147,27 +138,4 @@ public interface DorisSinkOptions {
+ "\"disable_auto_compaction\" =
\"false\"\n"
+ ")")
.withDescription("Create table statement template, used to
create Doris table");
-
- OptionRule.Builder SINK_RULE =
- OptionRule.builder()
- .required(
- FENODES,
- USERNAME,
- PASSWORD,
- SINK_LABEL_PREFIX,
- DORIS_SINK_CONFIG_PREFIX,
- DATA_SAVE_MODE,
- SCHEMA_SAVE_MODE)
- .optional(
- DATABASE,
- TABLE,
- TABLE_IDENTIFIER,
- QUERY_PORT,
- DORIS_BATCH_SIZE,
- SINK_ENABLE_2PC,
- SINK_ENABLE_DELETE,
- MULTI_TABLE_SINK_REPLICA,
- SAVE_MODE_CREATE_TEMPLATE,
- NEEDS_UNSUPPORTED_TYPE_CASTING)
- .conditional(DATA_SAVE_MODE,
DataSaveMode.CUSTOM_PROCESSING, CUSTOM_SQL);
}
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSourceConfig.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSourceConfig.java
index 999f8fbfea..94615fcd85 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSourceConfig.java
@@ -25,10 +25,10 @@ import lombok.experimental.SuperBuilder;
import java.io.Serializable;
import java.util.List;
-import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.FENODES;
-import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.PASSWORD;
-import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.QUERY_PORT;
-import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.USERNAME;
+import static
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.FENODES;
+import static
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.PASSWORD;
+import static
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.QUERY_PORT;
+import static
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.USERNAME;
import static
org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_DESERIALIZE_ARROW_ASYNC;
import static
org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_DESERIALIZE_QUEUE_SIZE;
import static
org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS;
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSourceOptions.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSourceOptions.java
index 2ee852ffcc..49b9f85ea7 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSourceOptions.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSourceOptions.java
@@ -22,80 +22,81 @@ import org.apache.seatunnel.api.configuration.Options;
import java.util.List;
-public interface DorisSourceOptions {
+public class DorisSourceOptions extends DorisBaseOptions {
- int DORIS_TABLET_SIZE_MIN = 1;
- int DORIS_TABLET_SIZE_DEFAULT = Integer.MAX_VALUE;
- int DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT = 30 * 1000;
- int DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT = 30 * 1000;
- int DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT = 3600;
- int DORIS_REQUEST_RETRIES_DEFAULT = 3;
- Boolean DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT = false;
- int DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT = 64;
- long DORIS_EXEC_MEM_LIMIT_DEFAULT = 2147483648L;
+ public static final String DORIS_DEFAULT_CLUSTER = "default_cluster";
+ public static final int DORIS_TABLET_SIZE_MIN = 1;
+ public static final int DORIS_TABLET_SIZE_DEFAULT = Integer.MAX_VALUE;
+ public static final int DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT = 30 *
1000;
+ public static final int DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT = 30 * 1000;
+ public static final int DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT = 3600;
+ public static final int DORIS_REQUEST_RETRIES_DEFAULT = 3;
+ public static final Boolean DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT = false;
+ public static final int DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT = 64;
+ public static final long DORIS_EXEC_MEM_LIMIT_DEFAULT = 2147483648L;
- Option<List<DorisTableConfig>> TABLE_LIST =
+ public static final Option<List<DorisTableConfig>> TABLE_LIST =
Options.key("table_list")
.listType(DorisTableConfig.class)
.noDefaultValue()
.withDescription("table list config.");
- Option<String> DORIS_READ_FIELD =
+ public static final Option<String> DORIS_READ_FIELD =
Options.key("doris.read.field")
.stringType()
.noDefaultValue()
.withDescription(
"List of column names in the Doris table,
separated by commas");
- Option<String> DORIS_FILTER_QUERY =
+ public static final Option<String> DORIS_FILTER_QUERY =
Options.key("doris.filter.query")
.stringType()
.noDefaultValue()
.withDescription(
"Filter expression of the query, which is
transparently transmitted to Doris. Doris uses this expression to complete
source-side data filtering");
- Option<Integer> DORIS_TABLET_SIZE =
+ public static final Option<Integer> DORIS_TABLET_SIZE =
Options.key("doris.request.tablet.size")
.intType()
.defaultValue(DORIS_TABLET_SIZE_DEFAULT)
.withDescription("");
- Option<Integer> DORIS_REQUEST_CONNECT_TIMEOUT_MS =
+ public static final Option<Integer> DORIS_REQUEST_CONNECT_TIMEOUT_MS =
Options.key("doris.request.connect.timeout.ms")
.intType()
.defaultValue(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT)
.withDescription("");
- Option<Integer> DORIS_REQUEST_READ_TIMEOUT_MS =
+ public static final Option<Integer> DORIS_REQUEST_READ_TIMEOUT_MS =
Options.key("doris.request.read.timeout.ms")
.intType()
.defaultValue(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT)
.withDescription("");
- Option<Integer> DORIS_REQUEST_QUERY_TIMEOUT_S =
+ public static final Option<Integer> DORIS_REQUEST_QUERY_TIMEOUT_S =
Options.key("doris.request.query.timeout.s")
.intType()
.defaultValue(DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT)
.withDescription("");
- Option<Integer> DORIS_REQUEST_RETRIES =
+ public static final Option<Integer> DORIS_REQUEST_RETRIES =
Options.key("doris.request.retries")
.intType()
.defaultValue(DORIS_REQUEST_RETRIES_DEFAULT)
.withDescription("");
- Option<Boolean> DORIS_DESERIALIZE_ARROW_ASYNC =
+ public static final Option<Boolean> DORIS_DESERIALIZE_ARROW_ASYNC =
Options.key("doris.deserialize.arrow.async")
.booleanType()
.defaultValue(DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT)
.withDescription("");
- Option<Integer> DORIS_DESERIALIZE_QUEUE_SIZE =
+ public static final Option<Integer> DORIS_DESERIALIZE_QUEUE_SIZE =
Options.key("doris.request.retriesdoris.deserialize.queue.size")
.intType()
.defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT)
.withDescription("");
- Option<Long> DORIS_EXEC_MEM_LIMIT =
+ public static final Option<Long> DORIS_EXEC_MEM_LIMIT =
Options.key("doris.exec.mem.limit")
.longType()
.defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT)
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisTableConfig.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisTableConfig.java
index 624d25636b..bfd2fc7873 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisTableConfig.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisTableConfig.java
@@ -35,9 +35,9 @@ import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
-import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.DATABASE;
-import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.DORIS_BATCH_SIZE;
-import static org.apache.seatunnel.connectors.doris.config.DorisOptions.TABLE;
+import static
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.DATABASE;
+import static
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.DORIS_BATCH_SIZE;
+import static
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.TABLE;
import static
org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_EXEC_MEM_LIMIT;
import static
org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_FILTER_QUERY;
import static
org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_READ_FIELD;
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/AbstractDorisTypeConverter.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/AbstractDorisTypeConverter.java
index 67266b453f..a4888f3fdd 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/AbstractDorisTypeConverter.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/AbstractDorisTypeConverter.java
@@ -31,7 +31,7 @@ import lombok.extern.slf4j.Slf4j;
import java.util.Locale;
-import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.IDENTIFIER;
+import static
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.IDENTIFIER;
@Slf4j
public abstract class AbstractDorisTypeConverter implements
TypeConverter<BasicTypeDefine> {
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV1.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV1.java
index 9b7e98368f..c508b9a14c 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV1.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV1.java
@@ -27,7 +27,7 @@ import org.apache.seatunnel.api.table.type.LocalTimeType;
import com.google.auto.service.AutoService;
import lombok.extern.slf4j.Slf4j;
-import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.IDENTIFIER;
+import static
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.IDENTIFIER;
/** Doris type converter for version 1.2.x */
@Slf4j
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV2.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV2.java
index 46ae79251e..4a24e1d5da 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV2.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV2.java
@@ -36,7 +36,7 @@ import java.util.Locale;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.IDENTIFIER;
+import static
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.IDENTIFIER;
/** Doris type converter for version 2.x */
@Slf4j
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java
index 9a2ce67be2..009d060506 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java
@@ -19,6 +19,8 @@ package org.apache.seatunnel.connectors.doris.sink;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.options.SinkConnectorCommonOptions;
+import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.connector.TableSink;
@@ -38,10 +40,8 @@ import com.google.auto.service.AutoService;
import java.util.Arrays;
import java.util.List;
-import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.DATABASE;
-import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.IDENTIFIER;
-import static org.apache.seatunnel.connectors.doris.config.DorisOptions.TABLE;
-import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.TABLE_IDENTIFIER;
+import static
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.DATABASE;
+import static
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.TABLE;
import static
org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.NEEDS_UNSUPPORTED_TYPE_CASTING;
@AutoService(Factory.class)
@@ -49,12 +49,41 @@ public class DorisSinkFactory implements TableSinkFactory {
@Override
public String factoryIdentifier() {
- return IDENTIFIER;
+ return DorisSinkOptions.IDENTIFIER;
}
@Override
public OptionRule optionRule() {
- return DorisSinkOptions.SINK_RULE.build();
+ return OptionRule.builder()
+ .required(
+ DorisSinkOptions.FENODES,
+ DorisSinkOptions.USERNAME,
+ DorisSinkOptions.PASSWORD,
+ DorisSinkOptions.SINK_LABEL_PREFIX,
+ DorisSinkOptions.DORIS_SINK_CONFIG_PREFIX,
+ DorisSinkOptions.DATA_SAVE_MODE,
+ DorisSinkOptions.SCHEMA_SAVE_MODE)
+ .optional(
+ DorisSinkOptions.DATABASE,
+ DorisSinkOptions.TABLE,
+ DorisSinkOptions.TABLE_IDENTIFIER,
+ DorisSinkOptions.QUERY_PORT,
+ DorisSinkOptions.DORIS_BATCH_SIZE,
+ DorisSinkOptions.SINK_ENABLE_2PC,
+ DorisSinkOptions.SINK_ENABLE_DELETE,
+ DorisSinkOptions.SAVE_MODE_CREATE_TEMPLATE,
+ DorisSinkOptions.NEEDS_UNSUPPORTED_TYPE_CASTING,
+ DorisSinkOptions.SINK_CHECK_INTERVAL,
+ DorisSinkOptions.SINK_MAX_RETRIES,
+ DorisSinkOptions.SINK_BUFFER_SIZE,
+ DorisSinkOptions.SINK_BUFFER_COUNT,
+ DorisSinkOptions.DEFAULT_DATABASE,
+ SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA)
+ .conditional(
+ DorisSinkOptions.DATA_SAVE_MODE,
+ DataSaveMode.CUSTOM_PROCESSING,
+ DorisSinkOptions.CUSTOM_SQL)
+ .build();
}
@Override
@@ -79,7 +108,7 @@ public class DorisSinkFactory implements TableSinkFactory {
TableIdentifier tableId = catalogTable.getTableId();
String tableName;
String databaseName;
- String tableIdentifier = options.get(TABLE_IDENTIFIER);
+ String tableIdentifier =
options.get(DorisSinkOptions.TABLE_IDENTIFIER);
if (StringUtils.isNotEmpty(tableIdentifier)) {
tableName = tableIdentifier.split("\\.")[1];
databaseName = tableIdentifier.split("\\.")[0];
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/DorisSourceFactory.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/DorisSourceFactory.java
index 05f3e408ed..1bdbd86883 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/DorisSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/DorisSourceFactory.java
@@ -29,6 +29,7 @@ import
org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import org.apache.seatunnel.connectors.doris.catalog.DorisCatalog;
import org.apache.seatunnel.connectors.doris.catalog.DorisCatalogFactory;
import org.apache.seatunnel.connectors.doris.config.DorisSourceConfig;
+import org.apache.seatunnel.connectors.doris.config.DorisSourceOptions;
import org.apache.seatunnel.connectors.doris.config.DorisTableConfig;
import org.apache.commons.lang3.StringUtils;
@@ -43,37 +44,36 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.DATABASE;
-import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.DORIS_BATCH_SIZE;
-import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.FENODES;
-import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.IDENTIFIER;
-import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.PASSWORD;
-import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.QUERY_PORT;
-import static org.apache.seatunnel.connectors.doris.config.DorisOptions.TABLE;
-import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.USERNAME;
-import static
org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_FILTER_QUERY;
-import static
org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_READ_FIELD;
-import static
org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.TABLE_LIST;
-
@Slf4j
@AutoService(Factory.class)
public class DorisSourceFactory implements TableSourceFactory {
@Override
public String factoryIdentifier() {
- return IDENTIFIER;
+ return DorisSourceOptions.IDENTIFIER;
}
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(FENODES, USERNAME, PASSWORD)
- .optional(TABLE_LIST)
- .optional(DATABASE)
- .optional(TABLE)
- .optional(DORIS_FILTER_QUERY)
- .optional(DORIS_READ_FIELD)
- .optional(QUERY_PORT)
- .optional(DORIS_BATCH_SIZE)
+ .required(
+ DorisSourceOptions.FENODES,
+ DorisSourceOptions.USERNAME,
+ DorisSourceOptions.PASSWORD)
+ .optional(DorisSourceOptions.TABLE_LIST)
+ .optional(DorisSourceOptions.DATABASE)
+ .optional(DorisSourceOptions.TABLE)
+ .optional(DorisSourceOptions.DORIS_FILTER_QUERY)
+ .optional(DorisSourceOptions.DORIS_TABLET_SIZE)
+ .optional(DorisSourceOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS)
+ .optional(DorisSourceOptions.DORIS_REQUEST_READ_TIMEOUT_MS)
+ .optional(DorisSourceOptions.DORIS_REQUEST_QUERY_TIMEOUT_S)
+ .optional(DorisSourceOptions.DORIS_REQUEST_RETRIES)
+ .optional(DorisSourceOptions.DORIS_DESERIALIZE_ARROW_ASYNC)
+ .optional(DorisSourceOptions.DORIS_DESERIALIZE_QUEUE_SIZE)
+ .optional(DorisSourceOptions.DORIS_READ_FIELD)
+ .optional(DorisSourceOptions.QUERY_PORT)
+ .optional(DorisSourceOptions.DORIS_BATCH_SIZE)
+ .optional(DorisSourceOptions.DORIS_EXEC_MEM_LIMIT)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/reader/DorisValueReader.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/reader/DorisValueReader.java
index e68366cf26..6ec8d47c5c 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/reader/DorisValueReader.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/reader/DorisValueReader.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.doris.backend.BackendClient;
import org.apache.seatunnel.connectors.doris.config.DorisSourceConfig;
+import org.apache.seatunnel.connectors.doris.config.DorisSourceOptions;
import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode;
import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
import org.apache.seatunnel.connectors.doris.rest.PartitionDefinition;
@@ -45,7 +46,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import static
org.apache.seatunnel.connectors.doris.config.DorisOptions.DORIS_DEFAULT_CLUSTER;
import static
org.apache.seatunnel.connectors.doris.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE;
@Slf4j
@@ -115,7 +115,7 @@ public class DorisValueReader {
private TScanOpenParams openParams() {
TScanOpenParams params = new TScanOpenParams();
- params.setCluster(DORIS_DEFAULT_CLUSTER);
+ params.setCluster(DorisSourceOptions.DORIS_DEFAULT_CLUSTER);
params.setDatabase(partition.getDatabase());
params.setTable(partition.getTable());
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java
index 2a55cc350c..50db297dbf 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java
@@ -34,7 +34,7 @@ import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.connectors.doris.catalog.DorisCatalog;
import org.apache.seatunnel.connectors.doris.catalog.DorisCatalogFactory;
-import org.apache.seatunnel.connectors.doris.config.DorisOptions;
+import org.apache.seatunnel.connectors.doris.config.DorisBaseOptions;
import org.apache.seatunnel.connectors.doris.config.DorisSinkOptions;
import org.apache.seatunnel.connectors.doris.config.DorisSourceOptions;
import org.apache.seatunnel.connectors.doris.sink.DorisSinkFactory;
@@ -99,10 +99,10 @@ public class DorisCatalogIT extends AbstractDorisIT {
factory = new DorisCatalogFactory();
Map<String, Object> map = new HashMap<>();
- map.put(DorisOptions.FENODES.key(), frontEndNodes);
- map.put(DorisOptions.QUERY_PORT.key(), QUERY_PORT);
- map.put(DorisOptions.USERNAME.key(), USERNAME);
- map.put(DorisOptions.PASSWORD.key(), PASSWORD);
+ map.put(DorisBaseOptions.FENODES.key(), frontEndNodes);
+ map.put(DorisBaseOptions.QUERY_PORT.key(), QUERY_PORT);
+ map.put(DorisBaseOptions.USERNAME.key(), USERNAME);
+ map.put(DorisBaseOptions.PASSWORD.key(), PASSWORD);
catalog = (DorisCatalog) factory.createCatalog(catalogName,
ReadonlyConfig.fromMap(map));
@@ -163,10 +163,10 @@ public class DorisCatalogIT extends AbstractDorisIT {
new HashMap<String, Object>() {
{
put(
- DorisOptions.FENODES.key(),
+ DorisBaseOptions.FENODES.key(),
container.getHost() + ":" + HTTP_PORT);
- put(DorisOptions.USERNAME.key(), USERNAME);
- put(DorisOptions.PASSWORD.key(), PASSWORD);
+ put(DorisBaseOptions.USERNAME.key(), USERNAME);
+ put(DorisBaseOptions.PASSWORD.key(), PASSWORD);
}
});
assertCreateTable(upstreamTable, config, "test.test");
@@ -176,12 +176,12 @@ public class DorisCatalogIT extends AbstractDorisIT {
new HashMap<String, Object>() {
{
put(
- DorisOptions.FENODES.key(),
+ DorisBaseOptions.FENODES.key(),
container.getHost() + ":" + HTTP_PORT);
- put(DorisOptions.DATABASE.key(), "test2");
- put(DorisOptions.TABLE.key(), "test2");
- put(DorisOptions.USERNAME.key(), USERNAME);
- put(DorisOptions.PASSWORD.key(), PASSWORD);
+ put(DorisBaseOptions.DATABASE.key(), "test2");
+ put(DorisBaseOptions.TABLE.key(), "test2");
+ put(DorisBaseOptions.USERNAME.key(), USERNAME);
+ put(DorisBaseOptions.PASSWORD.key(), PASSWORD);
}
});
assertCreateTable(upstreamTable, config2, "test2.test2");
@@ -191,11 +191,11 @@ public class DorisCatalogIT extends AbstractDorisIT {
new HashMap<String, Object>() {
{
put(
- DorisOptions.FENODES.key(),
+ DorisBaseOptions.FENODES.key(),
container.getHost() + ":" + HTTP_PORT);
- put(DorisOptions.TABLE_IDENTIFIER.key(),
"test3.test3");
- put(DorisOptions.USERNAME.key(), USERNAME);
- put(DorisOptions.PASSWORD.key(), PASSWORD);
+ put(DorisSinkOptions.TABLE_IDENTIFIER.key(),
"test3.test3");
+ put(DorisBaseOptions.USERNAME.key(), USERNAME);
+ put(DorisBaseOptions.PASSWORD.key(), PASSWORD);
}
});
assertCreateTable(upstreamTable, config3, "test3.test3");
@@ -205,12 +205,12 @@ public class DorisCatalogIT extends AbstractDorisIT {
new HashMap<String, Object>() {
{
put(
- DorisOptions.FENODES.key(),
+ DorisBaseOptions.FENODES.key(),
container.getHost() + ":" + HTTP_PORT);
- put(DorisOptions.DATABASE.key(), "test5");
- put(DorisOptions.TABLE.key(), "${table_name}");
- put(DorisOptions.USERNAME.key(), USERNAME);
- put(DorisOptions.PASSWORD.key(), PASSWORD);
+ put(DorisBaseOptions.DATABASE.key(), "test5");
+ put(DorisBaseOptions.TABLE.key(),
"${table_name}");
+ put(DorisBaseOptions.USERNAME.key(), USERNAME);
+ put(DorisBaseOptions.PASSWORD.key(), PASSWORD);
}
});
assertCreateTable(upstreamTable, config4, "test5.test");
@@ -220,12 +220,12 @@ public class DorisCatalogIT extends AbstractDorisIT {
new HashMap<String, Object>() {
{
put(
- DorisOptions.FENODES.key(),
+ DorisBaseOptions.FENODES.key(),
container.getHost() + ":" + HTTP_PORT);
- put(DorisOptions.DATABASE.key(), "test4");
- put(DorisOptions.TABLE.key(), "test4");
- put(DorisOptions.USERNAME.key(), USERNAME);
- put(DorisOptions.PASSWORD.key(), PASSWORD);
+ put(DorisBaseOptions.DATABASE.key(), "test4");
+ put(DorisBaseOptions.TABLE.key(), "test4");
+ put(DorisBaseOptions.USERNAME.key(), USERNAME);
+ put(DorisBaseOptions.PASSWORD.key(), PASSWORD);
put(DorisSinkOptions.NEEDS_UNSUPPORTED_TYPE_CASTING.key(), true);
}
});
@@ -295,21 +295,29 @@ public class DorisCatalogIT extends AbstractDorisIT {
ReadonlyConfig.fromMap(
new HashMap<String, Object>() {
{
-
put(DorisOptions.DATABASE.key(), DATABASE);
-
put(DorisOptions.TABLE.key(), SINK_TABLE);
-
put(DorisOptions.USERNAME.key(), USERNAME);
-
put(DorisOptions.PASSWORD.key(), PASSWORD);
+ put(
+
DorisBaseOptions.DATABASE.key(),
+ DATABASE);
+ put(
+
DorisBaseOptions.TABLE.key(),
+ SINK_TABLE);
+ put(
+
DorisBaseOptions.USERNAME.key(),
+ USERNAME);
+ put(
+
DorisBaseOptions.PASSWORD.key(),
+ PASSWORD);
put(
DorisSourceOptions.DORIS_READ_FIELD
.key(),
"k1,k2");
put(
-
DorisOptions.FENODES.key(),
+
DorisBaseOptions.FENODES.key(),
container.getHost()
+ ":"
+
HTTP_PORT);
put(
-
DorisOptions.QUERY_PORT.key(),
+
DorisBaseOptions.QUERY_PORT.key(),
QUERY_PORT);
}
}),