This is an automated email from the ASF dual-hosted git repository.
zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 32fd54e add multisink to DorisBatchSink (#223)
32fd54e is described below
commit 32fd54e3c1595897d02a188b00badebd0b9ef4e2
Author: wudi <[email protected]>
AuthorDate: Mon Nov 6 16:27:08 2023 +0800
add multisink to DorisBatchSink (#223)
Support multi-table writing on DorisBatchSink
Example:
```java
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DorisBatchSink.Builder<RecordWithMeta> builder =
DorisBatchSink.builder();
final DorisReadOptions.Builder readOptionBuilder =
DorisReadOptions.builder();
Properties properties = new Properties();
properties.setProperty("column_separator", ",");
properties.setProperty("line_delimiter", "\n");
properties.setProperty("format", "csv");
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder.setFenodes("127.0.0.1:8030")
.setTableIdentifier("")
.setUsername("root")
.setPassword("");
DorisExecutionOptions.Builder executionBuilder =
DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix("label")
.setStreamLoadProp(properties)
.setDeletable(false)
.setBufferFlushMaxBytes(8 * 1024)
.setBufferFlushMaxRows(10)
.setBufferFlushIntervalMs(1000 * 10);
builder.setDorisReadOptions(readOptionBuilder.build())
.setDorisExecutionOptions(executionBuilder.build())
.setDorisOptions(dorisBuilder.build());
//Multiple table writing
RecordWithMeta record = new RecordWithMeta("test",
"test_flink_tmp1", "wangwu,1");
RecordWithMeta record1 = new RecordWithMeta("test",
"test_flink_tmp", "wangwu,1");
DataStreamSource<RecordWithMeta> source =
env.fromCollection(Arrays.asList(record, record1));
source.sinkTo(builder.build());
```
---
.../doris/flink/sink/batch/BatchRecordBuffer.java | 25 +++++
.../doris/flink/sink/batch/DorisBatchSink.java | 1 -
.../flink/sink/batch/DorisBatchStreamLoad.java | 109 ++++++++++-----------
.../doris/flink/sink/batch/DorisBatchWriter.java | 29 +++++-
.../doris/flink/sink/batch/RecordWithMeta.java | 62 ++++++++++++
.../doris/flink/sink/writer/LabelGenerator.java | 4 +
.../doris/flink/DorisSinkMultiTableExample.java | 101 +++++++++++++++++++
7 files changed, 271 insertions(+), 60 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java
index 5fa601d..1de6253 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java
@@ -36,6 +36,8 @@ public class BatchRecordBuffer {
private int numOfRecords = 0;
private int bufferSizeBytes = 0;
private boolean loadBatchFirstRecord = true;
+ private String database;
+ private String table;
public BatchRecordBuffer(){}
@@ -45,6 +47,14 @@ public class BatchRecordBuffer {
this.buffer = ByteBuffer.allocate(bufferSize);
}
+ public BatchRecordBuffer(String database, String table, byte[]
lineDelimiter, int bufferSize) {
+ super();
+ this.database = database;
+ this.table = table;
+ this.lineDelimiter = lineDelimiter;
+ this.buffer = ByteBuffer.allocate(bufferSize);
+ }
+
public void insert(byte[] record) {
ensureCapacity(record.length);
if(loadBatchFirstRecord){
@@ -141,4 +151,19 @@ public class BatchRecordBuffer {
this.bufferSizeBytes = bufferSizeBytes;
}
+ public String getDatabase() {
+ return database;
+ }
+
+ public void setDatabase(String database) {
+ this.database = database;
+ }
+
+ public String getTable() {
+ return table;
+ }
+
+ public void setTable(String table) {
+ this.table = table;
+ }
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchSink.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchSink.java
index 2c578d4..37d3973 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchSink.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchSink.java
@@ -87,7 +87,6 @@ public class DorisBatchSink<IN> implements Sink<IN> {
public DorisBatchSink<IN> build() {
Preconditions.checkNotNull(dorisOptions);
Preconditions.checkNotNull(dorisExecutionOptions);
- Preconditions.checkNotNull(serializer);
if(dorisReadOptions == null) {
dorisReadOptions = DorisReadOptions.builder().build();
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
index b6a3f65..6adb436 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
@@ -18,6 +18,7 @@
package org.apache.doris.flink.sink.batch;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
@@ -29,8 +30,7 @@ import org.apache.doris.flink.sink.EscapeHandler;
import org.apache.doris.flink.sink.HttpPutBuilder;
import org.apache.doris.flink.sink.HttpUtil;
import org.apache.doris.flink.sink.writer.LabelGenerator;
-
-import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.util.Preconditions;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
@@ -44,9 +44,10 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
-import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
@@ -77,16 +78,14 @@ public class DorisBatchStreamLoad implements Serializable {
private String hostPort;
private final String username;
private final String password;
- private final String db;
- private final String table;
private final Properties loadProps;
- private BatchRecordBuffer buffer;
+ private Map<String, BatchRecordBuffer> bufferMap = new
ConcurrentHashMap<>();
private DorisExecutionOptions executionOptions;
private ExecutorService loadExecutorService;
private LoadAsyncExecutor loadAsyncExecutor;
- private BlockingQueue<BatchRecordBuffer> writeQueue;
- private BlockingQueue<BatchRecordBuffer> readQueue;
+ private BlockingQueue<BatchRecordBuffer> flushQueue;
private final AtomicBoolean started;
+ private volatile boolean loadThreadAlive = false;
private AtomicReference<Throwable> exception = new AtomicReference<>(null);
private CloseableHttpClient httpClient = new HttpUtil().getHttpClient();
private BackendUtil backendUtil;
@@ -99,24 +98,18 @@ public class DorisBatchStreamLoad implements Serializable {
dorisOptions.getBenodes())
: new BackendUtil(RestService.getBackendsV2(dorisOptions,
dorisReadOptions, LOG));
this.hostPort = backendUtil.getAvailableBackend();
- String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
- this.db = tableInfo[0];
- this.table = tableInfo[1];
this.username = dorisOptions.getUsername();
this.password = dorisOptions.getPassword();
- this.loadUrl = String.format(LOAD_URL_PATTERN, hostPort, db, table);
this.loadProps = executionOptions.getStreamLoadProp();
this.labelGenerator = labelGenerator;
this.lineDelimiter =
EscapeHandler.escapeString(loadProps.getProperty(LINE_DELIMITER_KEY,
LINE_DELIMITER_DEFAULT)).getBytes();
this.executionOptions = executionOptions;
- //init queue
- this.writeQueue = new
ArrayBlockingQueue<>(executionOptions.getFlushQueueSize());
- LOG.info("init RecordBuffer capacity {}, count {}",
executionOptions.getBufferFlushMaxBytes(),
executionOptions.getFlushQueueSize());
- for (int index = 0; index < executionOptions.getFlushQueueSize();
index++) {
- this.writeQueue.add(new BatchRecordBuffer(this.lineDelimiter,
executionOptions.getBufferFlushMaxBytes()));
+ this.flushQueue = new
LinkedBlockingDeque<>(executionOptions.getFlushQueueSize());
+ if(StringUtils.isNotBlank(dorisOptions.getTableIdentifier())){
+ String[] tableInfo =
dorisOptions.getTableIdentifier().split("\\.");
+ Preconditions.checkState(tableInfo.length == 2, "tableIdentifier
input error, the format is database.table");
+ this.loadUrl = String.format(LOAD_URL_PATTERN, hostPort,
tableInfo[0], tableInfo[1]);
}
- readQueue = new LinkedBlockingDeque<>();
-
this.loadAsyncExecutor= new LoadAsyncExecutor();
this.loadExecutorService = new ThreadPoolExecutor(1, 1, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1), new
DefaultThreadFactory("streamload-executor"), new
ThreadPoolExecutor.AbortPolicy());
this.started = new AtomicBoolean(true);
@@ -128,26 +121,26 @@ public class DorisBatchStreamLoad implements Serializable
{
* @param record
* @throws IOException
*/
- public synchronized void writeRecord(byte[] record) throws
InterruptedException {
+ public synchronized void writeRecord(String database, String table, byte[]
record) throws InterruptedException {
checkFlushException();
- if(buffer == null){
- buffer = takeRecordFromWriteQueue();
- }
+ String bufferKey = getTableIdentifier(database, table);
+ BatchRecordBuffer buffer = bufferMap.computeIfAbsent(bufferKey, k ->
new BatchRecordBuffer(database, table, this.lineDelimiter,
executionOptions.getBufferFlushMaxBytes()));
buffer.insert(record);
//When it exceeds 80% of the byteSize,to flush, to avoid triggering
bytebuffer expansion
if (buffer.getBufferSizeBytes() >=
executionOptions.getBufferFlushMaxBytes() * 0.8
|| (executionOptions.getBufferFlushMaxRows() != 0 &&
buffer.getNumOfRecords() >= executionOptions.getBufferFlushMaxRows())) {
- flush(false);
+ flush(bufferKey,false);
}
}
- public synchronized void flush(boolean waitUtilDone) throws
InterruptedException {
+ public synchronized void flush(String bufferKey, boolean waitUtilDone)
throws InterruptedException {
checkFlushException();
- if (buffer != null && !buffer.isEmpty()) {
- buffer.setLabelName(labelGenerator.generateBatchLabel());
- BatchRecordBuffer tmpBuff = buffer;
- readQueue.put(tmpBuff);
- this.buffer = null;
+ if (null == bufferKey) {
+ for (String key : bufferMap.keySet()) {
+ flushBuffer(key);
+ }
+ } else if (bufferMap.containsKey(bufferKey)) {
+ flushBuffer(bufferKey);
}
if (waitUtilDone) {
@@ -155,20 +148,22 @@ public class DorisBatchStreamLoad implements Serializable
{
}
}
- private void putRecordToWriteQueue(BatchRecordBuffer buffer){
- try {
- writeQueue.put(buffer);
- } catch (InterruptedException e) {
- throw new RuntimeException("Failed to recycle a buffer to queue");
- }
+ private synchronized void flushBuffer(String bufferKey) {
+ BatchRecordBuffer buffer = bufferMap.get(bufferKey);
+
buffer.setLabelName(labelGenerator.generateBatchLabel(buffer.getTable()));
+ putRecordToFlushQueue(buffer);
+ bufferMap.remove(bufferKey);
}
- private BatchRecordBuffer takeRecordFromWriteQueue(){
+ private void putRecordToFlushQueue(BatchRecordBuffer buffer){
checkFlushException();
+ if(!loadThreadAlive){
+ throw new RuntimeException("load thread already exit, write was
interrupted");
+ }
try {
- return writeQueue.take();
+ flushQueue.put(buffer);
} catch (InterruptedException e) {
- throw new RuntimeException("Failed to take a buffer from queue");
+ throw new RuntimeException("Failed to put record buffer to flush
queue");
}
}
@@ -178,31 +173,34 @@ public class DorisBatchStreamLoad implements Serializable
{
}
}
- private void waitAsyncLoadFinish() throws InterruptedException {
+ private void waitAsyncLoadFinish() {
for(int i = 0; i < executionOptions.getFlushQueueSize() + 1 ; i++){
- BatchRecordBuffer empty = takeRecordFromWriteQueue();
- readQueue.put(empty);
+ BatchRecordBuffer empty = new BatchRecordBuffer();
+ putRecordToFlushQueue(empty);
}
}
+ private String getTableIdentifier(String database, String table) {
+ return database + "." + table;
+ }
+
public void close(){
//close async executor
this.loadExecutorService.shutdown();
this.started.set(false);
-
//clear buffer
- this.writeQueue.clear();
- this.readQueue.clear();
+ this.flushQueue.clear();
}
class LoadAsyncExecutor implements Runnable {
@Override
public void run() {
LOG.info("LoadAsyncExecutor start");
+ loadThreadAlive = true;
while (started.get()) {
BatchRecordBuffer buffer = null;
try {
- buffer = readQueue.poll(2000L, TimeUnit.MILLISECONDS);
+ buffer = flushQueue.poll(2000L, TimeUnit.MILLISECONDS);
if(buffer == null){
continue;
}
@@ -212,23 +210,20 @@ public class DorisBatchStreamLoad implements Serializable
{
} catch (Exception e) {
LOG.error("worker running error", e);
exception.set(e);
+ //clear queue to avoid writer thread blocking
+ flushQueue.clear();
break;
- } finally {
- //Recycle buffer to avoid writer thread blocking
- if(buffer != null){
- buffer.clear();
- putRecordToWriteQueue(buffer);
- }
}
}
LOG.info("LoadAsyncExecutor stop");
+ loadThreadAlive = false;
}
/**
* execute stream load
*/
public void load(String label, BatchRecordBuffer buffer) throws
IOException{
- refreshLoadUrl();
+ refreshLoadUrl(buffer.getDatabase(), buffer.getTable());
ByteBuffer data = buffer.getData();
ByteArrayEntity entity = new ByteArrayEntity(data.array(),
data.arrayOffset(), data.limit());
HttpPutBuilder putBuilder = new HttpPutBuilder();
@@ -266,14 +261,16 @@ public class DorisBatchStreamLoad implements Serializable
{
}
retry++;
// get available backend retry
- refreshLoadUrl();
+ refreshLoadUrl(buffer.getDatabase(), buffer.getTable());
putBuilder.setUrl(loadUrl);
}
+ buffer.clear();
+ buffer = null;
}
- private void refreshLoadUrl(){
+ private void refreshLoadUrl(String database, String table){
hostPort = backendUtil.getAvailableBackend();
- loadUrl = String.format(LOAD_URL_PATTERN, hostPort, db, table);
+ loadUrl = String.format(LOAD_URL_PATTERN, hostPort, database,
table);
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
index d4621c7..6b2ce02 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
@@ -24,11 +24,14 @@ import
org.apache.doris.flink.sink.writer.DorisRecordSerializer;
import org.apache.doris.flink.sink.writer.LabelGenerator;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -46,12 +49,20 @@ public class DorisBatchWriter<IN> implements SinkWriter<IN>
{
private final DorisRecordSerializer<IN> serializer;
private final transient ScheduledExecutorService scheduledExecutorService;
private transient volatile Exception flushException = null;
+ private String database;
+ private String table;
public DorisBatchWriter(Sink.InitContext initContext,
DorisRecordSerializer<IN> serializer,
DorisOptions dorisOptions,
DorisReadOptions dorisReadOptions,
DorisExecutionOptions executionOptions) {
+
if(!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())){
+ String[] tableInfo =
dorisOptions.getTableIdentifier().split("\\.");
+ Preconditions.checkState(tableInfo.length == 2, "tableIdentifier
input error, the format is database.table");
+ this.database = tableInfo[0];
+ this.table = tableInfo[1];
+ }
LOG.info("labelPrefix " + executionOptions.getLabelPrefix());
this.labelPrefix = executionOptions.getLabelPrefix() + "_" +
initContext.getSubtaskId();
this.labelGenerator = new LabelGenerator(labelPrefix, false);
@@ -72,7 +83,7 @@ public class DorisBatchWriter<IN> implements SinkWriter<IN> {
private void intervalFlush() {
try {
LOG.info("interval flush triggered.");
- batchStreamLoad.flush(false);
+ batchStreamLoad.flush(null, false);
} catch (InterruptedException e) {
flushException = e;
}
@@ -81,18 +92,30 @@ public class DorisBatchWriter<IN> implements SinkWriter<IN>
{
@Override
public void write(IN in, Context context) throws IOException,
InterruptedException {
checkFlushException();
+ if(in instanceof RecordWithMeta){
+ RecordWithMeta row = (RecordWithMeta) in;
+ if(StringUtils.isNullOrWhitespaceOnly(row.getTable())
+ ||StringUtils.isNullOrWhitespaceOnly(row.getDatabase())
+ ||row.getRecord() == null){
+ LOG.warn("Record or meta format is incorrect, ignore record
db:{}, table:{}, row:{}", row.getDatabase(), row.getTable(), row.getRecord());
+ return;
+ }
+ batchStreamLoad.writeRecord(row.getDatabase(), row.getTable(),
row.getRecord().getBytes(StandardCharsets.UTF_8));
+ return;
+ }
+
byte[] serialize = serializer.serialize(in);
if(Objects.isNull(serialize)){
//ddl record
return;
}
- batchStreamLoad.writeRecord(serialize);
+ batchStreamLoad.writeRecord(database, table, serialize);
}
@Override
public void flush(boolean flush) throws IOException, InterruptedException {
checkFlushException();
LOG.info("checkpoint flush triggered.");
- batchStreamLoad.flush(true);
+ batchStreamLoad.flush(null, true);
}
@Override
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/RecordWithMeta.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/RecordWithMeta.java
new file mode 100644
index 0000000..7f4d269
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/RecordWithMeta.java
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.batch;
+
+public class RecordWithMeta {
+ private String database;
+ private String table;
+ private String record;
+
+ public RecordWithMeta() {
+ }
+
+ public RecordWithMeta(String database, String table, String record) {
+ this.database = database;
+ this.table = table;
+ this.record = record;
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+
+ public void setDatabase(String database) {
+ this.database = database;
+ }
+
+ public String getTable() {
+ return table;
+ }
+
+ public void setTable(String table) {
+ this.table = table;
+ }
+
+ public String getRecord() {
+ return record;
+ }
+
+ public void setRecord(String record) {
+ this.record = record;
+ }
+
+ public String getTableIdentifier() {
+ return this.database + "." + this.table;
+ }
+
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
index 235b553..55f7811 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
@@ -38,4 +38,8 @@ public class LabelGenerator {
public String generateBatchLabel() {
return labelPrefix + "_" + UUID.randomUUID();
}
+
+ public String generateBatchLabel(String table) {
+ return String.format("%s_%s_%s", labelPrefix, table,
UUID.randomUUID());
+ }
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkMultiTableExample.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkMultiTableExample.java
new file mode 100644
index 0000000..eade292
--- /dev/null
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkMultiTableExample.java
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink;
+
+import org.apache.doris.flink.cfg.DorisExecutionOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.sink.batch.DorisBatchSink;
+import org.apache.doris.flink.sink.batch.RecordWithMeta;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import java.util.Properties;
+import java.util.UUID;
+
+
+public class DorisSinkMultiTableExample {
+ public static void main(String[] args) throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ DorisBatchSink.Builder<RecordWithMeta> builder =
DorisBatchSink.builder();
+ final DorisReadOptions.Builder readOptionBuilder =
DorisReadOptions.builder();
+
+ readOptionBuilder.setDeserializeArrowAsync(false)
+ .setDeserializeQueueSize(64)
+ .setExecMemLimit(2147483648L)
+ .setRequestQueryTimeoutS(3600)
+ .setRequestBatchSize(1000)
+ .setRequestConnectTimeoutMs(10000)
+ .setRequestReadTimeoutMs(10000)
+ .setRequestRetries(3)
+ .setRequestTabletSize(1024 * 1024);
+
+ Properties properties = new Properties();
+ properties.setProperty("column_separator", ",");
+ properties.setProperty("line_delimiter", "\n");
+ properties.setProperty("format", "csv");
+ DorisOptions.Builder dorisBuilder = DorisOptions.builder();
+ dorisBuilder.setFenodes("127.0.0.1:8030")
+ .setTableIdentifier("test.test_flink_tmp")
+ .setUsername("root")
+ .setPassword("");
+
+ DorisExecutionOptions.Builder executionBuilder =
DorisExecutionOptions.builder();
+
+ executionBuilder.setLabelPrefix("label")
+ .setStreamLoadProp(properties)
+ .setDeletable(false)
+ .setBufferFlushMaxBytes(8 * 1024)
+ .setBufferFlushMaxRows(10)
+ .setBufferFlushIntervalMs(1000 * 10);
+
+ builder.setDorisReadOptions(readOptionBuilder.build())
+ .setDorisExecutionOptions(executionBuilder.build())
+ .setDorisOptions(dorisBuilder.build());
+
+// RecordWithMeta record = new RecordWithMeta("test",
"test_flink_tmp1", "wangwu,1");
+// RecordWithMeta record1 = new RecordWithMeta("test",
"test_flink_tmp", "wangwu,1");
+// DataStreamSource<RecordWithMeta> stringDataStreamSource =
env.fromCollection(
+// Arrays.asList(record, record1));
+// stringDataStreamSource.sinkTo(builder.build());
+
+ env.addSource(new SourceFunction<RecordWithMeta>() {
+ private Long id = 1000000L;
+ @Override
+ public void run(SourceContext<RecordWithMeta> out) throws
Exception {
+ while (true) {
+ id = id + 1;
+ RecordWithMeta record = new RecordWithMeta("test",
"test_flink_tmp1", UUID.randomUUID() + ",1");
+ out.collect(record);
+ record = new RecordWithMeta("test", "test_flink_tmp",
UUID.randomUUID() + ",1");
+ out.collect(record);
+ Thread.sleep(3000);
+ }
+ }
+
+ @Override
+ public void cancel() {
+
+ }
+ }).sinkTo(builder.build());
+
+ env.execute("doris multi table test");
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]