This is an automated email from the ASF dual-hosted git repository.
zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 8aaaded [improve] add auto redirect and uniq default open 2pc (#202)
8aaaded is described below
commit 8aaadedaf3fb3147a134e333603cb1af3d7530cf
Author: wudi <[email protected]>
AuthorDate: Sun Oct 8 14:31:37 2023 +0800
[improve] add auto redirect and uniq default open 2pc (#202)
---
.../doris/flink/cfg/DorisConnectionOptions.java | 27 +++++++++++++++++++---
.../doris/flink/cfg/DorisExecutionOptions.java | 27 +++++++++++++++++++---
.../org/apache/doris/flink/cfg/DorisOptions.java | 12 +++++++---
.../flink/exception/ConnectedFailedException.java | 4 ++--
.../org/apache/doris/flink/rest/RestService.java | 20 ++++++++++++++++
.../apache/doris/flink/rest/models/BackendV2.java | 8 +++++++
.../org/apache/doris/flink/sink/DorisSink.java | 16 +++++++++++++
.../doris/flink/table/DorisConfigOptions.java | 4 ++--
.../flink/table/DorisDynamicTableFactory.java | 9 ++++++--
.../apache/doris/flink/tools/cdc/DatabaseSync.java | 10 ++++++--
10 files changed, 120 insertions(+), 17 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java
index 1382dde..541e4e5 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java
@@ -32,6 +32,11 @@ public class DorisConnectionOptions implements Serializable {
protected final String password;
protected String jdbcUrl;
protected String benodes;
+ /**
+ * Used to enable automatic redirection of fe,
+ * When it is not enabled, it will actively request the be list, and the
polling will initiate a streamload request to be.
+ */
+ protected boolean autoRedirect;
public DorisConnectionOptions(String fenodes, String username, String
password) {
this.fenodes = Preconditions.checkNotNull(fenodes, "fenodes is
empty");
@@ -45,10 +50,11 @@ public class DorisConnectionOptions implements Serializable
{
}
public DorisConnectionOptions(String fenodes, String benodes, String
username, String password,
- String jdbcUrl) {
+ String jdbcUrl, boolean autoRedirect) {
this(fenodes, username, password);
this.benodes = benodes;
this.jdbcUrl = jdbcUrl;
+ this.autoRedirect = autoRedirect;
}
public String getFenodes() {
@@ -71,21 +77,31 @@ public class DorisConnectionOptions implements Serializable
{
return jdbcUrl;
}
+ public boolean isAutoRedirect() {
+ return autoRedirect;
+ }
+
/**
* Builder for {@link DorisConnectionOptions}.
*/
public static class DorisConnectionOptionsBuilder {
private String fenodes;
+ private String benodes;
private String username;
private String password;
-
private String jdbcUrl;
+ private boolean autoRedirect;
public DorisConnectionOptionsBuilder withFenodes(String fenodes) {
this.fenodes = fenodes;
return this;
}
+ public DorisConnectionOptionsBuilder withBenodes(String benodes) {
+ this.benodes = benodes;
+ return this;
+ }
+
public DorisConnectionOptionsBuilder withUsername(String username) {
this.username = username;
return this;
@@ -101,8 +117,13 @@ public class DorisConnectionOptions implements
Serializable {
return this;
}
+ public DorisConnectionOptionsBuilder withAutoRedirect(boolean
autoRedirect) {
+ this.autoRedirect = autoRedirect;
+ return this;
+ }
+
public DorisConnectionOptions build() {
- return new DorisConnectionOptions(fenodes, username, password,
jdbcUrl);
+ return new DorisConnectionOptions(fenodes, benodes, username,
password, jdbcUrl, autoRedirect);
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index 2422df8..8f7022d 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -49,7 +49,8 @@ public class DorisExecutionOptions implements Serializable {
*/
private final Properties streamLoadProp;
private final Boolean enableDelete;
- private final Boolean enable2PC;
+ private Boolean enable2PC;
+ private boolean force2PC;
//batch mode param
private final int flushQueueSize;
@@ -73,7 +74,8 @@ public class DorisExecutionOptions implements Serializable {
int bufferFlushMaxRows,
int bufferFlushMaxBytes,
long bufferFlushIntervalMs,
- boolean ignoreUpdateBefore) {
+ boolean ignoreUpdateBefore,
+ boolean force2PC) {
Preconditions.checkArgument(maxRetries >= 0);
this.checkInterval = checkInterval;
this.maxRetries = maxRetries;
@@ -84,6 +86,7 @@ public class DorisExecutionOptions implements Serializable {
this.streamLoadProp = streamLoadProp;
this.enableDelete = enableDelete;
this.enable2PC = enable2PC;
+ this.force2PC = force2PC;
this.enableBatchMode = enableBatchMode;
this.flushQueueSize = flushQueueSize;
@@ -176,6 +179,14 @@ public class DorisExecutionOptions implements Serializable
{
return ignoreUpdateBefore;
}
+ public void setEnable2PC(Boolean enable2PC) {
+ this.enable2PC = enable2PC;
+ }
+
+ public boolean force2PC() {
+ return force2PC;
+ }
+
/**
* Builder of {@link DorisExecutionOptions}.
*/
@@ -190,6 +201,9 @@ public class DorisExecutionOptions implements Serializable {
private boolean enableDelete = true;
private boolean enable2PC = true;
+ //A flag used to determine whether to forcibly open 2pc. By default,
the uniq model close 2pc.
+ private boolean force2PC = false;
+
private int flushQueueSize = DEFAULT_FLUSH_QUEUE_SIZE;
private int bufferFlushMaxRows = DEFAULT_BUFFER_FLUSH_MAX_ROWS;
private int bufferFlushMaxBytes = DEFAULT_BUFFER_FLUSH_MAX_BYTES;
@@ -244,6 +258,13 @@ public class DorisExecutionOptions implements Serializable
{
return this;
}
+ public Builder enable2PC() {
+ this.enable2PC = true;
+ //Force open 2pc
+ this.force2PC = true;
+ return this;
+ }
+
public Builder enableBatchMode() {
this.enableBatchMode = true;
return this;
@@ -278,7 +299,7 @@ public class DorisExecutionOptions implements Serializable {
public DorisExecutionOptions build() {
return new DorisExecutionOptions(checkInterval, maxRetries,
bufferSize, bufferCount, labelPrefix, useCache,
streamLoadProp, enableDelete, enable2PC, enableBatchMode,
flushQueueSize, bufferFlushMaxRows,
- bufferFlushMaxBytes, bufferFlushIntervalMs,
ignoreUpdateBefore);
+ bufferFlushMaxBytes, bufferFlushIntervalMs,
ignoreUpdateBefore, force2PC);
}
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
index cf7b932..f560eae 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
@@ -43,8 +43,8 @@ public class DorisOptions extends DorisConnectionOptions {
}
public DorisOptions(String fenodes, String beNodes, String username,
String password,
- String tableIdentifier, String jdbcUrl) {
- super(fenodes, beNodes, username, password, jdbcUrl);
+ String tableIdentifier, String jdbcUrl, boolean redirect) {
+ super(fenodes, beNodes, username, password, jdbcUrl, redirect);
this.tableIdentifier = tableIdentifier;
}
@@ -70,6 +70,7 @@ public class DorisOptions extends DorisConnectionOptions {
private String jdbcUrl;
private String username;
private String password;
+ private boolean autoRedirect;
private String tableIdentifier;
/**
@@ -120,10 +121,15 @@ public class DorisOptions extends DorisConnectionOptions {
return this;
}
+ public Builder setAutoRedirect(boolean autoRedirect) {
+ this.autoRedirect = autoRedirect;
+ return this;
+ }
+
public DorisOptions build() {
checkNotNull(fenodes, "No fenodes supplied.");
checkNotNull(tableIdentifier, "No tableIdentifier supplied.");
- return new DorisOptions(fenodes, benodes, username, password,
tableIdentifier, jdbcUrl);
+ return new DorisOptions(fenodes, benodes, username, password,
tableIdentifier, jdbcUrl, autoRedirect);
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ConnectedFailedException.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ConnectedFailedException.java
index 6f755b7..8ab00ca 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ConnectedFailedException.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ConnectedFailedException.java
@@ -19,10 +19,10 @@ package org.apache.doris.flink.exception;
public class ConnectedFailedException extends DorisRuntimeException {
public ConnectedFailedException(String server, Throwable cause) {
- super("Connect to " + server + "failed.", cause);
+ super("Connect to " + server + " failed.", cause);
}
public ConnectedFailedException(String server, int statusCode, Throwable
cause) {
- super("Connect to " + server + "failed, status code is " + statusCode
+ ".", cause);
+ super("Connect to " + server + " failed, status code is " + statusCode
+ ".", cause);
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
index b75586a..1633c96 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
@@ -356,6 +356,11 @@ public class RestService implements Serializable {
public static List<BackendV2.BackendRowV2> getBackendsV2(DorisOptions
options, DorisReadOptions readOptions, Logger logger) {
String feNodes = options.getFenodes();
List<String> feNodeList = allEndpoints(feNodes, logger);
+
+ if(options.isAutoRedirect() && !feNodeList.isEmpty()){
+ return convert(feNodeList);
+ }
+
for (String feNode: feNodeList) {
try {
String beUrl = "http://" + feNode + BACKENDS_V2;
@@ -373,6 +378,21 @@ public class RestService implements Serializable {
throw new DorisRuntimeException(errMsg);
}
+ /**
+ * When the user turns on redirection,
+ * there is no need to explicitly obtain the be list, just treat the fe
list as the be list.
+ * @param feNodeList
+ * @return
+ */
+ private static List<BackendV2.BackendRowV2> convert(List<String>
feNodeList){
+ List<BackendV2.BackendRowV2> nodeList = new ArrayList<>();
+ for(String node : feNodeList){
+ String[] split = node.split(":");
+ nodeList.add(BackendV2.BackendRowV2.of(split[0],
Integer.valueOf(split[1]), true));
+ }
+ return nodeList;
+ }
+
static List<BackendV2.BackendRowV2> parseBackendV2(String response, Logger
logger) {
ObjectMapper mapper = new ObjectMapper();
BackendV2 backend;
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java
index b2f42db..98f9093 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java
@@ -75,5 +75,13 @@ public class BackendV2 {
return ip + ":" + httpPort;
}
+ public static BackendRowV2 of(String ip, int httpPort, boolean alive){
+ BackendRowV2 rowV2 = new BackendRowV2();
+ rowV2.setIp(ip);
+ rowV2.setHttpPort(httpPort);
+ rowV2.setAlive(alive);
+ return rowV2;
+ }
+
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
index d64e488..d1aee44 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
@@ -20,6 +20,7 @@ package org.apache.doris.flink.sink;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.rest.RestService;
import org.apache.doris.flink.sink.committer.DorisCommitter;
import org.apache.doris.flink.sink.writer.DorisRecordSerializer;
import org.apache.doris.flink.sink.writer.DorisWriter;
@@ -31,6 +32,8 @@ import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
@@ -43,6 +46,7 @@ import java.util.Optional;
*/
public class DorisSink<IN> implements Sink<IN, DorisCommittable,
DorisWriterState, DorisCommittable> {
+ private static final Logger LOG = LoggerFactory.getLogger(DorisSink.class);
private final DorisOptions dorisOptions;
private final DorisReadOptions dorisReadOptions;
private final DorisExecutionOptions dorisExecutionOptions;
@@ -56,6 +60,18 @@ public class DorisSink<IN> implements Sink<IN,
DorisCommittable, DorisWriterStat
this.dorisReadOptions = dorisReadOptions;
this.dorisExecutionOptions = dorisExecutionOptions;
this.serializer = serializer;
+ checkKeyType();
+ }
+
+ /**
+ * The uniq model has 2pc close by default unless 2pc is forced open
+ */
+ private void checkKeyType() {
+ if (dorisExecutionOptions.enabled2PC()
+ && !dorisExecutionOptions.force2PC()
+ && RestService.isUniqueKeyType(dorisOptions, dorisReadOptions,
LOG)){
+ dorisExecutionOptions.setEnable2PC(false);
+ }
}
@Override
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
index 50b205c..af01ea5 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -44,8 +44,8 @@ public class DorisConfigOptions {
public static final ConfigOption<String> TABLE_IDENTIFIER =
ConfigOptions.key("table.identifier").stringType().noDefaultValue().withDescription("the
doris table name.");
public static final ConfigOption<String> USERNAME =
ConfigOptions.key("username").stringType().noDefaultValue().withDescription("the
doris user name.");
public static final ConfigOption<String> PASSWORD =
ConfigOptions.key("password").stringType().noDefaultValue().withDescription("the
doris password.");
-
public static final ConfigOption<String> JDBC_URL =
ConfigOptions.key("jdbc-url").stringType().noDefaultValue().withDescription("doris
jdbc url address.");
+ public static final ConfigOption<Boolean> AUTO_REDIRECT =
ConfigOptions.key("auto-redirect").booleanType().defaultValue(false).withDescription("Use
automatic redirection of fe without explicitly obtaining the be list");
// source config options
public static final ConfigOption<String> DORIS_READ_FIELD = ConfigOptions
@@ -176,7 +176,7 @@ public class DorisConfigOptions {
public static final ConfigOption<Integer> SINK_BUFFER_SIZE = ConfigOptions
.key("sink.buffer-size")
.intType()
- .defaultValue(256 * 1024)
+ .defaultValue(1024 * 1024)
.withDescription("the buffer size to cache data for stream load.");
public static final ConfigOption<Integer> SINK_BUFFER_COUNT = ConfigOptions
.key("sink.buffer-count")
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index 9583236..978fa27 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -20,8 +20,6 @@ import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisLookupOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
-
-import static org.apache.doris.flink.table.DorisConfigOptions.BENODES;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableSchema;
@@ -38,6 +36,8 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import static org.apache.doris.flink.table.DorisConfigOptions.AUTO_REDIRECT;
+import static org.apache.doris.flink.table.DorisConfigOptions.BENODES;
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_BATCH_SIZE;
import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_ARROW_ASYNC;
import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_QUEUE_SIZE;
@@ -179,6 +179,7 @@ public final class DorisDynamicTableFactory implements
DynamicTableSourceFactory
final DorisOptions.Builder builder = DorisOptions.builder()
.setFenodes(fenodes)
.setBenodes(benodes)
+ .setAutoRedirect(readableConfig.get(AUTO_REDIRECT))
.setJdbcUrl(readableConfig.get(JDBC_URL))
.setTableIdentifier(readableConfig.get(TABLE_IDENTIFIER));
@@ -214,8 +215,12 @@ public final class DorisDynamicTableFactory implements
DynamicTableSourceFactory
builder.setStreamLoadProp(streamLoadProp);
builder.setDeletable(readableConfig.get(SINK_ENABLE_DELETE));
builder.setIgnoreUpdateBefore(readableConfig.get(SINK_IGNORE_UPDATE_BEFORE));
+
if (!readableConfig.get(SINK_ENABLE_2PC)) {
builder.disable2PC();
+ } else if (readableConfig.getOptional(SINK_ENABLE_2PC).isPresent()){
+ //force open 2pc
+ builder.enable2PC();
}
if(readableConfig.get(SINK_ENABLE_BATCH_MODE)) {
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index a28403e..8aef65d 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -137,6 +137,7 @@ public abstract class DatabaseSync {
private DorisConnectionOptions getDorisConnectionOptions() {
String fenodes = sinkConfig.getString(DorisConfigOptions.FENODES);
+ String benodes = sinkConfig.getString(DorisConfigOptions.BENODES);
String user = sinkConfig.getString(DorisConfigOptions.USERNAME);
String passwd = sinkConfig.getString(DorisConfigOptions.PASSWORD, "");
String jdbcUrl = sinkConfig.getString(DorisConfigOptions.JDBC_URL);
@@ -145,6 +146,7 @@ public abstract class DatabaseSync {
Preconditions.checkNotNull(jdbcUrl, "jdbcurl is empty in sink-conf");
DorisConnectionOptions.DorisConnectionOptionsBuilder builder = new
DorisConnectionOptions.DorisConnectionOptionsBuilder()
.withFenodes(fenodes)
+ .withBenodes(benodes)
.withUsername(user)
.withPassword(passwd)
.withJdbcUrl(jdbcUrl);
@@ -168,6 +170,7 @@ public abstract class DatabaseSync {
.setTableIdentifier(database + "." + table)
.setUsername(user)
.setPassword(passwd);
+
sinkConfig.getOptional(DorisConfigOptions.AUTO_REDIRECT).ifPresent(dorisBuilder::setAutoRedirect);
Properties pro = new Properties();
//default json data format
@@ -187,9 +190,12 @@ public abstract class DatabaseSync {
sinkConfig.getOptional(DorisConfigOptions.SINK_MAX_RETRIES).ifPresent(executionBuilder::setMaxRetries);
sinkConfig.getOptional(DorisConfigOptions.SINK_IGNORE_UPDATE_BEFORE).ifPresent(executionBuilder::setIgnoreUpdateBefore);
- boolean enable2pc =
sinkConfig.getBoolean(DorisConfigOptions.SINK_ENABLE_2PC);
- if(!enable2pc){
+
+ if(!sinkConfig.getBoolean(DorisConfigOptions.SINK_ENABLE_2PC)){
executionBuilder.disable2PC();
+ } else
if(sinkConfig.getOptional(DorisConfigOptions.SINK_ENABLE_2PC).isPresent()){
+ //force open 2pc
+ executionBuilder.enable2PC();
}
//batch option
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]