This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 2e7d791 [fix] fix batch sink flush lost batch (#180)
2e7d791 is described below
commit 2e7d7914393acf7a69d7596120f1be10694adef9
Author: bingquanzhao <[email protected]>
AuthorDate: Mon Aug 21 10:30:58 2023 +0800
[fix] fix batch sink flush lost batch (#180)
---
.../flink/sink/batch/DorisBatchStreamLoad.java | 15 +++--
.../apache/doris/flink/DorisSinkBatchExample.java | 64 ++++++++++++++++++++--
2 files changed, 66 insertions(+), 13 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
index 1fe217d..a43220c 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
@@ -139,17 +139,16 @@ public class DorisBatchStreamLoad implements Serializable
{
public synchronized void flush(boolean waitUtilDone) throws
InterruptedException {
checkFlushException();
- if (buffer == null) {
- LOG.debug("buffer is empty, skip flush.");
- return;
+ if (buffer != null && !buffer.isEmpty()) {
+ buffer.setLabelName(labelGenerator.generateBatchLabel());
+ BatchRecordBuffer tmpBuff = buffer;
+ readQueue.put(tmpBuff);
+ this.buffer = null;
}
- buffer.setLabelName(labelGenerator.generateBatchLabel());
- BatchRecordBuffer tmpBuff = buffer;
- readQueue.put(tmpBuff);
- if(waitUtilDone){
+
+ if (waitUtilDone) {
waitAsyncLoadFinish();
}
- this.buffer = null;
}
private void putRecordToWriteQueue(BatchRecordBuffer buffer){
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkBatchExample.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkBatchExample.java
index a6835c6..4a04d78 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkBatchExample.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkBatchExample.java
@@ -21,15 +21,17 @@ import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.batch.DorisBatchSink;
import org.apache.doris.flink.sink.writer.SimpleStringSerializer;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import java.util.Arrays;
import java.util.Properties;
import java.util.UUID;
public class DorisSinkBatchExample {
- public static void main(String[] args) throws Exception{
+ public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
@@ -56,11 +58,11 @@ public class DorisSinkBatchExample {
.setTableIdentifier("test.test_flink")
.setUsername("root")
.setPassword("");
- DorisExecutionOptions.Builder executionBuilder =
DorisExecutionOptions.builder();
+ DorisExecutionOptions.Builder executionBuilder =
DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix("label")
.setStreamLoadProp(properties)
.setDeletable(false)
- .setBufferFlushMaxBytes(8*1024)
+ .setBufferFlushMaxBytes(8 * 1024)
.setBufferFlushMaxRows(900)
.setBufferFlushIntervalMs(1000 * 10);
@@ -71,15 +73,17 @@ public class DorisSinkBatchExample {
env.addSource(new SourceFunction<String>() {
private Long id = 0L;
+
@Override
public void run(SourceContext<String> out) throws Exception {
- while(true){
- id=id+1;
+ while (true) {
+ id = id + 1;
String record = id + "," + UUID.randomUUID() + "," + id +
"";
out.collect(record);
Thread.sleep(500);
}
}
+
@Override
public void cancel() {
@@ -88,4 +92,54 @@ public class DorisSinkBatchExample {
env.execute("doris batch test");
}
+
+ public void testBatchFlush() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ DorisBatchSink.Builder<String> builder = DorisBatchSink.builder();
+ final DorisReadOptions.Builder readOptionBuilder =
DorisReadOptions.builder();
+
+ readOptionBuilder.setDeserializeArrowAsync(false)
+ .setDeserializeQueueSize(64)
+ .setExecMemLimit(2147483648L)
+ .setRequestQueryTimeoutS(3600)
+ .setRequestBatchSize(1000)
+ .setRequestConnectTimeoutMs(10000)
+ .setRequestReadTimeoutMs(10000)
+ .setRequestRetries(3)
+ .setRequestTabletSize(1024 * 1024);
+
+ Properties properties = new Properties();
+ properties.setProperty("column_separator", ",");
+ properties.setProperty("line_delimiter", "\n");
+ properties.setProperty("format", "csv");
+ DorisOptions.Builder dorisBuilder = DorisOptions.builder();
+ dorisBuilder.setFenodes("127.0.0.1:8030")
+ .setTableIdentifier("test.testd")
+ .setUsername("root")
+ .setPassword("");
+
+ DorisExecutionOptions.Builder executionBuilder =
DorisExecutionOptions.builder();
+
+ executionBuilder.setLabelPrefix("label")
+ .setStreamLoadProp(properties)
+ .setDeletable(false)
+ .setBufferFlushMaxBytes(8 * 1024)
+ .setBufferFlushMaxRows(1)
+ .setBufferFlushIntervalMs(1000 * 10);
+
+ builder.setDorisReadOptions(readOptionBuilder.build())
+ .setDorisExecutionOptions(executionBuilder.build())
+ .setSerializer(new SimpleStringSerializer())
+ .setDorisOptions(dorisBuilder.build());
+
+
+ DataStreamSource<String> stringDataStreamSource = env.fromCollection(
+ Arrays.asList("1,-74159.9193252453", "2,-74159.9193252453",
"3,-19.7004480979", "4,43385.2170333507", "5,-16.2602598554"));
+ stringDataStreamSource.sinkTo(builder.build());
+
+ env.execute("doris batch test");
+
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]