This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 8c15c4f [Improve]Added direct access to BE through the intranet (#187)
8c15c4f is described below
commit 8c15c4f0bf2d63507b8ac1fd0b8b6d00a37afb6d
Author: DongLiang-0 <[email protected]>
AuthorDate: Tue Sep 5 15:26:20 2023 +0800
[Improve]Added direct access to BE through the intranet (#187)
---
flink-doris-connector/pom.xml | 8 +-
.../doris/flink/cfg/DorisConnectionOptions.java | 16 +++-
.../org/apache/doris/flink/cfg/DorisOptions.java | 19 +++-
.../org/apache/doris/flink/sink/BackendUtil.java | 32 ++++++-
.../flink/sink/batch/DorisBatchStreamLoad.java | 6 +-
.../doris/flink/sink/committer/DorisCommitter.java | 10 +-
.../doris/flink/sink/writer/DorisWriter.java | 7 +-
.../doris/flink/table/DorisConfigOptions.java | 1 +
.../flink/table/DorisDynamicTableFactory.java | 5 +
.../doris/flink/table/DorisDynamicTableSource.java | 1 +
.../doris/flink/table/DorisRowDataInputFormat.java | 5 +
.../apache/doris/flink/tools/cdc/DatabaseSync.java | 2 +
.../flink/DorisIntranetAccessSinkExample.java | 105 +++++++++++++++++++++
.../flink/sink/committer/TestDorisCommitter.java | 24 ++++-
.../flink/tools/cdc/CdcMysqlSyncDatabaseCase.java | 5 +-
.../tools/cdc/CdcOraclelSyncDatabaseCase.java | 5 +-
.../tools/cdc/CdcPostgresSyncDatabaseCase.java | 5 +-
.../tools/cdc/CdcSqlServerSyncDatabaseCase.java | 5 +-
18 files changed, 236 insertions(+), 25 deletions(-)
diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index 333a40b..e52a07a 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -224,7 +224,13 @@ under the License.
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
- <version>2.27.0</version>
+ <version>4.2.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-inline</artifactId>
+ <version>4.2.0</version>
<scope>test</scope>
</dependency>
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java
index 00abd52..1382dde 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java
@@ -31,6 +31,7 @@ public class DorisConnectionOptions implements Serializable {
protected final String username;
protected final String password;
protected String jdbcUrl;
+ protected String benodes;
public DorisConnectionOptions(String fenodes, String username, String
password) {
this.fenodes = Preconditions.checkNotNull(fenodes, "fenodes is
empty");
@@ -38,8 +39,15 @@ public class DorisConnectionOptions implements Serializable {
this.password = password;
}
- public DorisConnectionOptions(String fenodes, String username, String
password, String jdbcUrl){
- this(fenodes,username,password);
+ public DorisConnectionOptions(String fenodes, String username, String
password, String jdbcUrl) {
+ this(fenodes, username, password);
+ this.jdbcUrl = jdbcUrl;
+ }
+
+ public DorisConnectionOptions(String fenodes, String benodes, String
username, String password,
+ String jdbcUrl) {
+ this(fenodes, username, password);
+ this.benodes = benodes;
this.jdbcUrl = jdbcUrl;
}
@@ -55,6 +63,10 @@ public class DorisConnectionOptions implements Serializable {
return password;
}
+ public String getBenodes() {
+ return benodes;
+ }
+
public String getJdbcUrl(){
return jdbcUrl;
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
index c9e36e3..cf7b932 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
@@ -42,6 +42,12 @@ public class DorisOptions extends DorisConnectionOptions {
this.tableIdentifier = tableIdentifier;
}
+ public DorisOptions(String fenodes, String beNodes, String username,
String password,
+ String tableIdentifier, String jdbcUrl) {
+ super(fenodes, beNodes, username, password, jdbcUrl);
+ this.tableIdentifier = tableIdentifier;
+ }
+
public String getTableIdentifier() {
return tableIdentifier;
}
@@ -60,7 +66,7 @@ public class DorisOptions extends DorisConnectionOptions {
*/
public static class Builder {
private String fenodes;
-
+ private String benodes;
private String jdbcUrl;
private String username;
private String password;
@@ -98,6 +104,14 @@ public class DorisOptions extends DorisConnectionOptions {
return this;
}
+ /**
+ * optional, Backend Http Port
+ */
+ public Builder setBenodes(String benodes) {
+ this.benodes = benodes;
+ return this;
+ }
+
/**
* not required, fe jdbc url, for lookup query
*/
@@ -109,9 +123,8 @@ public class DorisOptions extends DorisConnectionOptions {
public DorisOptions build() {
checkNotNull(fenodes, "No fenodes supplied.");
checkNotNull(tableIdentifier, "No tableIdentifier supplied.");
- return new DorisOptions(fenodes, username, password,
tableIdentifier, jdbcUrl);
+ return new DorisOptions(fenodes, benodes, username, password,
tableIdentifier, jdbcUrl);
}
}
-
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
index 701cad6..9f9516a 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
@@ -19,11 +19,15 @@ package org.apache.doris.flink.sink;
import org.apache.doris.flink.exception.DorisRuntimeException;
import org.apache.doris.flink.rest.models.BackendV2;
+import org.apache.doris.flink.rest.models.BackendV2.BackendRowV2;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.HttpURLConnection;
import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
public class BackendUtil {
@@ -36,13 +40,34 @@ public class BackendUtil {
this.pos = 0;
}
+ public BackendUtil(String beNodes) {
+ this.backends = initBackends(beNodes);
+ this.pos = 0;
+ }
+
+ private List<BackendV2.BackendRowV2> initBackends(String beNodes) {
+ List<BackendV2.BackendRowV2> backends = new ArrayList<>();
+ List<String> nodes = Arrays.asList(beNodes.split(","));
+ nodes.forEach(node -> {
+ if (tryHttpConnection(node)) {
+ node = node.trim();
+ String[] ipAndPort = node.split(":");
+ BackendRowV2 backendRowV2 = new BackendRowV2();
+ backendRowV2.setIp(ipAndPort[0]);
+ backendRowV2.setHttpPort(Integer.parseInt(ipAndPort[1]));
+ backendRowV2.setAlive(true);
+ backends.add(backendRowV2);
+ }
+ });
+ return backends;
+ }
+
public String getAvailableBackend() {
long tmp = pos + backends.size();
while (pos < tmp) {
- BackendV2.BackendRowV2 backend = backends.get((int) (pos %
backends.size()));
+ BackendV2.BackendRowV2 backend = backends.get((int) (pos++ %
backends.size()));
String res = backend.toBackendString();
- if(tryHttpConnection(res)){
- pos++;
+ if (tryHttpConnection(res)) {
return res;
}
}
@@ -60,7 +85,6 @@ public class BackendUtil {
return true;
} catch (Exception ex) {
LOG.warn("Failed to connect to backend:{}", backend, ex);
- pos++;
return false;
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
index a43220c..b6a3f65 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
@@ -29,6 +29,8 @@ import org.apache.doris.flink.sink.EscapeHandler;
import org.apache.doris.flink.sink.HttpPutBuilder;
import org.apache.doris.flink.sink.HttpUtil;
import org.apache.doris.flink.sink.writer.LabelGenerator;
+
+import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
@@ -93,7 +95,9 @@ public class DorisBatchStreamLoad implements Serializable {
DorisReadOptions dorisReadOptions,
DorisExecutionOptions executionOptions,
LabelGenerator labelGenerator) {
- this.backendUtil = new
BackendUtil(RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG));
+ this.backendUtil = StringUtils.isNotEmpty(dorisOptions.getBenodes()) ?
new BackendUtil(
+ dorisOptions.getBenodes())
+ : new BackendUtil(RestService.getBackendsV2(dorisOptions,
dorisReadOptions, LOG));
this.hostPort = backendUtil.getAvailableBackend();
String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
this.db = tableInfo[0];
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
index 3b61d82..5bb1a40 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
@@ -17,6 +17,7 @@
package org.apache.doris.flink.sink.committer;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.connector.sink.Committer;
import com.fasterxml.jackson.core.type.TypeReference;
@@ -25,6 +26,7 @@ import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.exception.DorisRuntimeException;
import org.apache.doris.flink.rest.RestService;
+import org.apache.doris.flink.sink.BackendUtil;
import org.apache.doris.flink.sink.DorisCommittable;
import org.apache.doris.flink.sink.HttpPutBuilder;
import org.apache.doris.flink.sink.HttpUtil;
@@ -55,6 +57,7 @@ public class DorisCommitter implements
Committer<DorisCommittable> {
private final DorisOptions dorisOptions;
private final DorisReadOptions dorisReadOptions;
private final ObjectMapper jsonMapper = new ObjectMapper();
+ private final BackendUtil backendUtil;
int maxRetry;
@@ -67,6 +70,9 @@ public class DorisCommitter implements
Committer<DorisCommittable> {
this.dorisReadOptions = dorisReadOptions;
this.maxRetry = maxRetry;
this.httpClient = client;
+ this.backendUtil = StringUtils.isNotEmpty(dorisOptions.getBenodes()) ?
new BackendUtil(
+ dorisOptions.getBenodes())
+ : new BackendUtil(RestService.getBackendsV2(dorisOptions,
dorisReadOptions, LOG));
}
@Override
@@ -116,13 +122,13 @@ public class DorisCommitter implements
Committer<DorisCommittable> {
if (retry == maxRetry) {
throw new DorisRuntimeException("stream load error: " +
reasonPhrase);
}
- hostPort = RestService.getBackend(dorisOptions,
dorisReadOptions, LOG);
+ hostPort = backendUtil.getAvailableBackend();
} catch (IOException e) {
LOG.error("commit transaction failed: ", e);
if (retry == maxRetry) {
throw new IOException("commit transaction failed: {}", e);
}
- hostPort = RestService.getBackend(dorisOptions,
dorisReadOptions, LOG);
+ hostPort = backendUtil.getAvailableBackend();
}
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index 7890670..1f98206 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -27,6 +27,8 @@ import org.apache.doris.flink.rest.models.RespContent;
import org.apache.doris.flink.sink.BackendUtil;
import org.apache.doris.flink.sink.DorisCommittable;
import org.apache.doris.flink.sink.HttpUtil;
+
+import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
@@ -99,8 +101,9 @@ public class DorisWriter<IN> implements SinkWriter<IN,
DorisCommittable, DorisWr
}
public void initializeLoad(List<DorisWriterState> state) throws
IOException {
- //cache backend
- backendUtil = new BackendUtil(RestService.getBackendsV2(dorisOptions,
dorisReadOptions, LOG));
+ this.backendUtil = StringUtils.isNotEmpty(dorisOptions.getBenodes()) ?
new BackendUtil(
+ dorisOptions.getBenodes())
+ : new BackendUtil(RestService.getBackendsV2(dorisOptions,
dorisReadOptions, LOG));
try {
this.dorisStreamLoad = new DorisStreamLoad(
backendUtil.getAvailableBackend(),
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
index ba408a6..98b9a78 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -40,6 +40,7 @@ public class DorisConfigOptions {
public static final String IDENTIFIER = "doris";
// common option
public static final ConfigOption<String> FENODES =
ConfigOptions.key("fenodes").stringType().noDefaultValue().withDescription("doris
fe http address.");
+ public static final ConfigOption<String> BENODES =
ConfigOptions.key("benodes").stringType().noDefaultValue().withDescription("doris
be http address.");
public static final ConfigOption<String> TABLE_IDENTIFIER =
ConfigOptions.key("table.identifier").stringType().noDefaultValue().withDescription("the
doris table name.");
public static final ConfigOption<String> USERNAME =
ConfigOptions.key("username").stringType().noDefaultValue().withDescription("the
doris user name.");
public static final ConfigOption<String> PASSWORD =
ConfigOptions.key("password").stringType().noDefaultValue().withDescription("the
doris password.");
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index 66a6a19..57e4fc0 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -20,6 +20,8 @@ import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisLookupOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
+
+import static org.apache.doris.flink.table.DorisConfigOptions.BENODES;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableSchema;
@@ -103,6 +105,7 @@ public final class DorisDynamicTableFactory implements
DynamicTableSourceFactory
public Set<ConfigOption<?>> optionalOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(FENODES);
+ options.add(BENODES);
options.add(TABLE_IDENTIFIER);
options.add(USERNAME);
options.add(PASSWORD);
@@ -169,8 +172,10 @@ public final class DorisDynamicTableFactory implements
DynamicTableSourceFactory
private DorisOptions getDorisOptions(ReadableConfig readableConfig) {
final String fenodes = readableConfig.get(FENODES);
+ final String benodes = readableConfig.get(BENODES);
final DorisOptions.Builder builder = DorisOptions.builder()
.setFenodes(fenodes)
+ .setBenodes(benodes)
.setJdbcUrl(readableConfig.get(JDBC_URL))
.setTableIdentifier(readableConfig.get(TABLE_IDENTIFIER));
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
index bd04e20..35a4489 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
@@ -104,6 +104,7 @@ public final class DorisDynamicTableSource implements
ScanTableSource, LookupTab
}
DorisRowDataInputFormat.Builder builder =
DorisRowDataInputFormat.builder()
.setFenodes(options.getFenodes())
+ .setBenodes(options.getBenodes())
.setUsername(options.getUsername())
.setPassword(options.getPassword())
.setTableIdentifier(options.getTableIdentifier())
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java
index 7181ce6..47bcf69 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java
@@ -198,6 +198,11 @@ public class DorisRowDataInputFormat extends
RichInputFormat<RowData, DorisTable
return this;
}
+ public Builder setBenodes(String benodes) {
+ this.optionsBuilder.setBenodes(benodes);
+ return this;
+ }
+
public Builder setUsername(String username) {
this.optionsBuilder.setUsername(username);
return this;
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index bc15987..455000e 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -152,6 +152,7 @@ public abstract class DatabaseSync {
*/
public DorisSink<String> buildDorisSink(String table) {
String fenodes = sinkConfig.getString(DorisConfigOptions.FENODES);
+ String benodes = sinkConfig.getString(DorisConfigOptions.BENODES);
String user = sinkConfig.getString(DorisConfigOptions.USERNAME);
String passwd = sinkConfig.getString(DorisConfigOptions.PASSWORD, "");
String labelPrefix =
sinkConfig.getString(DorisConfigOptions.SINK_LABEL_PREFIX);
@@ -159,6 +160,7 @@ public abstract class DatabaseSync {
DorisSink.Builder<String> builder = DorisSink.builder();
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder.setFenodes(fenodes)
+ .setBenodes(benodes)
.setTableIdentifier(database + "." + table)
.setUsername(user)
.setPassword(passwd);
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisIntranetAccessSinkExample.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisIntranetAccessSinkExample.java
new file mode 100644
index 0000000..61ad1dd
--- /dev/null
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisIntranetAccessSinkExample.java
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink;
+
+import org.apache.doris.flink.cfg.DorisExecutionOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.sink.DorisSink;
+import org.apache.doris.flink.sink.writer.SimpleStringSerializer;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * When the flink connector accesses doris, it parses out all surviving BE
nodes according to the FE address filled in.
+ * <p>
+ * However, when the BE node is deployed, most of the internal network IP is
filled in,
+ * so the BE node parsed by FE is the internal network IP. When flink is
deployed on a non-intranet segment,
+ * the BE node will be inaccessible on the network.
+ * <p>
+ * In this case, you can access the BE node on the intranet by directly
configuring {@link DorisOptions.builder().setBenodes().build()},
+ * after you configure this parameter, Flink Connector will not parse all BE
nodes through FE nodes.
+ */
+public class DorisIntranetAccessSinkExample {
+
+ public static void main(String[] args) throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ env.enableCheckpointing(10000);
+ env.getCheckpointConfig()
+
.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5,
Time.milliseconds(30000)));
+
+ DorisSink.Builder<String> builder = DorisSink.builder();
+ final DorisReadOptions.Builder readOptionBuilder =
DorisReadOptions.builder();
+ readOptionBuilder.setDeserializeArrowAsync(false)
+ .setDeserializeQueueSize(64)
+ .setExecMemLimit(2147483648L)
+ .setRequestQueryTimeoutS(3600)
+ .setRequestBatchSize(1000)
+ .setRequestConnectTimeoutMs(10000)
+ .setRequestReadTimeoutMs(10000)
+ .setRequestRetries(3)
+ .setRequestTabletSize(1024 * 1024);
+
+ Properties properties = new Properties();
+ properties.setProperty("column_separator", ",");
+ properties.setProperty("line_delimiter", "\n");
+ properties.setProperty("format", "csv");
+ DorisOptions.Builder dorisBuilder = DorisOptions.builder();
+ dorisBuilder.setFenodes("10.20.30.1:8030")
+ .setBenodes("10.20.30.1:8040, 10.20.30.2:8040,
10.20.30.3:8040")
+ .setTableIdentifier("test.test_sink")
+ .setUsername("root")
+ .setPassword("");
+
+ DorisExecutionOptions.Builder executionBuilder =
DorisExecutionOptions.builder();
+ executionBuilder
+ .disable2PC().setLabelPrefix("label-doris")
+ .setStreamLoadProp(properties)
+ .setBufferSize(8 * 1024)
+ .setBufferCount(3);
+
+ builder.setDorisReadOptions(readOptionBuilder.build())
+ .setDorisExecutionOptions(executionBuilder.build())
+ .setSerializer(new SimpleStringSerializer())
+ .setDorisOptions(dorisBuilder.build());
+
+ List<Tuple2<Integer, String>> data = new ArrayList<>();
+ data.add(new Tuple2<>(1, "zhangsan"));
+ data.add(new Tuple2<>(2, "lisi"));
+ data.add(new Tuple2<>(3, "wangwu"));
+ DataStreamSource<Tuple2<Integer, String>> source =
env.fromCollection(data);
+ source.map((MapFunction<Tuple2<Integer, String>, String>) t -> t.f0 +
"," + t.f1)
+ .sinkTo(builder.build());
+ env.execute("doris test");
+ }
+}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java
index e81638f..7cc2a88 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java
@@ -20,14 +20,18 @@ package org.apache.doris.flink.sink.committer;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.exception.DorisRuntimeException;
+import org.apache.doris.flink.rest.RestService;
+import org.apache.doris.flink.rest.models.BackendV2.BackendRowV2;
import org.apache.doris.flink.sink.DorisCommittable;
import org.apache.doris.flink.sink.HttpEntityMock;
import org.apache.doris.flink.sink.OptionUtils;
+
import org.apache.http.ProtocolVersion;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.message.BasicStatusLine;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -35,8 +39,11 @@ import org.junit.Test;
import java.util.Collections;
import static org.mockito.ArgumentMatchers.any;
+import org.mockito.MockedStatic;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;
+import org.slf4j.Logger;
/**
* Test for Doris Committer.
@@ -46,8 +53,10 @@ public class TestDorisCommitter {
DorisCommitter dorisCommitter;
DorisCommittable dorisCommittable;
HttpEntityMock entityMock;
+ private MockedStatic<RestService> restServiceMockedStatic;
+
@Before
- public void setUp() throws Exception{
+ public void setUp() throws Exception {
DorisOptions dorisOptions = OptionUtils.buildDorisOptions();
DorisReadOptions readOptions = OptionUtils.buildDorisReadOptions();
dorisCommittable = new DorisCommittable("127.0.0.1:8710", "test", 0);
@@ -55,9 +64,15 @@ public class TestDorisCommitter {
entityMock = new HttpEntityMock();
CloseableHttpResponse httpResponse = mock(CloseableHttpResponse.class);
StatusLine normalLine = new BasicStatusLine(new
ProtocolVersion("http", 1, 0), 200, "");
+ restServiceMockedStatic = mockStatic(RestService.class);
+ Logger mockLogger = mock(Logger.class);
+ mock(RestService.class);
+
when(httpClient.execute(any())).thenReturn(httpResponse);
when(httpResponse.getStatusLine()).thenReturn(normalLine);
when(httpResponse.getEntity()).thenReturn(entityMock);
+ when(RestService.getBackendsV2(dorisOptions, readOptions,
mockLogger)).thenReturn(
+ Collections.singletonList(new BackendRowV2()));
dorisCommitter = new DorisCommitter(dorisOptions, readOptions, 2,
httpClient);
}
@@ -73,7 +88,7 @@ public class TestDorisCommitter {
}
@Test(expected = DorisRuntimeException.class)
- public void testCommitAbort() throws Exception{
+ public void testCommitAbort() throws Exception {
String response = "{\n" +
"\"status\": \"Fail\",\n" +
"\"msg\": \"errCode = 2, detailMessage = transaction [25] is
already aborted. abort reason: User Abort\"\n" +
@@ -81,4 +96,9 @@ public class TestDorisCommitter {
this.entityMock.setValue(response);
dorisCommitter.commit(Collections.singletonList(dorisCommittable));
}
+
+ @After
+ public void after() {
+ restServiceMockedStatic.close();
+ }
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
index c20be39..1a205b1 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
@@ -52,10 +52,11 @@ public class CdcMysqlSyncDatabaseCase {
Configuration config = Configuration.fromMap(mysqlConfig);
Map<String,String> sinkConfig = new HashMap<>();
- sinkConfig.put("fenodes","127.0.0.1:8030");
+ sinkConfig.put("fenodes","10.20.30.1:8030");
+ // sinkConfig.put("benodes","10.20.30.1:8040, 10.20.30.2:8040,
10.20.30.3:8040");
sinkConfig.put("username","root");
sinkConfig.put("password","");
- sinkConfig.put("jdbc-url","jdbc:mysql://127.0.0.1:9030");
+ sinkConfig.put("jdbc-url","jdbc:mysql://10.20.30.1:9030");
sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString());
Configuration sinkConf = Configuration.fromMap(sinkConfig);
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
index 08cf586..3a2a39e 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
@@ -58,10 +58,11 @@ public class CdcOraclelSyncDatabaseCase {
Configuration config = Configuration.fromMap(sourceConfig);
Map<String,String> sinkConfig = new HashMap<>();
- sinkConfig.put("fenodes","127.0.0.1:8030");
+ sinkConfig.put("fenodes","10.20.30.1:8030");
+ // sinkConfig.put("benodes","10.20.30.1:8040, 10.20.30.2:8040,
10.20.30.3:8040");
sinkConfig.put("username","root");
sinkConfig.put("password","");
- sinkConfig.put("jdbc-url","jdbc:mysql://127.0.0.1:9030");
+ sinkConfig.put("jdbc-url","jdbc:mysql://10.20.30.1:9030");
sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString());
Configuration sinkConf = Configuration.fromMap(sinkConfig);
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
index 4d5b485..cf5e1d8 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
@@ -61,10 +61,11 @@ public class CdcPostgresSyncDatabaseCase {
Configuration config = Configuration.fromMap(sourceConfig);
Map<String,String> sinkConfig = new HashMap<>();
- sinkConfig.put("fenodes","127.0.0.1:8737");
+ sinkConfig.put("fenodes","10.20.30.1:8030");
+ // sinkConfig.put("benodes","10.20.30.1:8040, 10.20.30.2:8040,
10.20.30.3:8040");
sinkConfig.put("username","root");
sinkConfig.put("password","");
- sinkConfig.put("jdbc-url","jdbc:mysql://127.0.0.1:9737");
+ sinkConfig.put("jdbc-url","jdbc:mysql://10.20.30.1:9030");
sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString());
Configuration sinkConf = Configuration.fromMap(sinkConfig);
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
index 96780aa..7251a7f 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
@@ -59,10 +59,11 @@ public class CdcSqlServerSyncDatabaseCase {
Configuration config = Configuration.fromMap(sourceConfig);
Map<String,String> sinkConfig = new HashMap<>();
- sinkConfig.put("fenodes","127.0.0.1:8030");
+ sinkConfig.put("fenodes","10.20.30.1:8030");
+ // sinkConfig.put("benodes","10.20.30.1:8040, 10.20.30.2:8040,
10.20.30.3:8040");
sinkConfig.put("username","root");
sinkConfig.put("password","");
- sinkConfig.put("jdbc-url","jdbc:mysql://127.0.0.1:9030");
+ sinkConfig.put("jdbc-url","jdbc:mysql://10.20.30.1:9030");
sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString());
Configuration sinkConf = Configuration.fromMap(sinkConfig);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]