This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e7d791  [fix] fix batch sink flush lost batch (#180)
2e7d791 is described below

commit 2e7d7914393acf7a69d7596120f1be10694adef9
Author: bingquanzhao <[email protected]>
AuthorDate: Mon Aug 21 10:30:58 2023 +0800

    [fix] fix batch sink flush lost batch (#180)
---
 .../flink/sink/batch/DorisBatchStreamLoad.java     | 15 +++--
 .../apache/doris/flink/DorisSinkBatchExample.java  | 64 ++++++++++++++++++++--
 2 files changed, 66 insertions(+), 13 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
index 1fe217d..a43220c 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
@@ -139,17 +139,16 @@ public class DorisBatchStreamLoad implements Serializable 
{
 
     public synchronized void flush(boolean waitUtilDone) throws 
InterruptedException {
         checkFlushException();
-        if (buffer == null) {
-            LOG.debug("buffer is empty, skip flush.");
-            return;
+        if (buffer != null && !buffer.isEmpty()) {
+            buffer.setLabelName(labelGenerator.generateBatchLabel());
+            BatchRecordBuffer tmpBuff = buffer;
+            readQueue.put(tmpBuff);
+            this.buffer = null;
         }
-        buffer.setLabelName(labelGenerator.generateBatchLabel());
-        BatchRecordBuffer tmpBuff = buffer;
-        readQueue.put(tmpBuff);
-        if(waitUtilDone){
+
+        if (waitUtilDone) {
             waitAsyncLoadFinish();
         }
-        this.buffer = null;
     }
 
     private void putRecordToWriteQueue(BatchRecordBuffer buffer){
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkBatchExample.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkBatchExample.java
index a6835c6..4a04d78 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkBatchExample.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkBatchExample.java
@@ -21,15 +21,17 @@ import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.sink.batch.DorisBatchSink;
 import org.apache.doris.flink.sink.writer.SimpleStringSerializer;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 
+import java.util.Arrays;
 import java.util.Properties;
 import java.util.UUID;
 
 
 public class DorisSinkBatchExample {
-    public static void main(String[] args) throws Exception{
+    public static void main(String[] args) throws Exception {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(1);
 //        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
@@ -56,11 +58,11 @@ public class DorisSinkBatchExample {
                 .setTableIdentifier("test.test_flink")
                 .setUsername("root")
                 .setPassword("");
-        DorisExecutionOptions.Builder  executionBuilder = 
DorisExecutionOptions.builder();
+        DorisExecutionOptions.Builder executionBuilder = 
DorisExecutionOptions.builder();
         executionBuilder.setLabelPrefix("label")
                 .setStreamLoadProp(properties)
                 .setDeletable(false)
-                .setBufferFlushMaxBytes(8*1024)
+                .setBufferFlushMaxBytes(8 * 1024)
                 .setBufferFlushMaxRows(900)
                 .setBufferFlushIntervalMs(1000 * 10);
 
@@ -71,15 +73,17 @@ public class DorisSinkBatchExample {
 
         env.addSource(new SourceFunction<String>() {
             private Long id = 0L;
+
             @Override
             public void run(SourceContext<String> out) throws Exception {
-                while(true){
-                    id=id+1;
+                while (true) {
+                    id = id + 1;
                     String record = id + "," + UUID.randomUUID() + "," + id + 
"";
                     out.collect(record);
                     Thread.sleep(500);
                 }
             }
+
             @Override
             public void cancel() {
 
@@ -88,4 +92,54 @@ public class DorisSinkBatchExample {
 
         env.execute("doris batch test");
     }
+
+    public void testBatchFlush() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+
+        DorisBatchSink.Builder<String> builder = DorisBatchSink.builder();
+        final DorisReadOptions.Builder readOptionBuilder = 
DorisReadOptions.builder();
+
+        readOptionBuilder.setDeserializeArrowAsync(false)
+                .setDeserializeQueueSize(64)
+                .setExecMemLimit(2147483648L)
+                .setRequestQueryTimeoutS(3600)
+                .setRequestBatchSize(1000)
+                .setRequestConnectTimeoutMs(10000)
+                .setRequestReadTimeoutMs(10000)
+                .setRequestRetries(3)
+                .setRequestTabletSize(1024 * 1024);
+
+        Properties properties = new Properties();
+        properties.setProperty("column_separator", ",");
+        properties.setProperty("line_delimiter", "\n");
+        properties.setProperty("format", "csv");
+        DorisOptions.Builder dorisBuilder = DorisOptions.builder();
+        dorisBuilder.setFenodes("127.0.0.1:8030")
+                .setTableIdentifier("test.testd")
+                .setUsername("root")
+                .setPassword("");
+
+        DorisExecutionOptions.Builder executionBuilder = 
DorisExecutionOptions.builder();
+
+        executionBuilder.setLabelPrefix("label")
+                .setStreamLoadProp(properties)
+                .setDeletable(false)
+                .setBufferFlushMaxBytes(8 * 1024)
+                .setBufferFlushMaxRows(1)
+                .setBufferFlushIntervalMs(1000 * 10);
+
+        builder.setDorisReadOptions(readOptionBuilder.build())
+                .setDorisExecutionOptions(executionBuilder.build())
+                .setSerializer(new SimpleStringSerializer())
+                .setDorisOptions(dorisBuilder.build());
+
+
+        DataStreamSource<String> stringDataStreamSource = env.fromCollection(
+                Arrays.asList("1,-74159.9193252453", "2,-74159.9193252453", 
"3,-19.7004480979", "4,43385.2170333507", "5,-16.2602598554"));
+        stringDataStreamSource.sinkTo(builder.build());
+
+        env.execute("doris batch test");
+
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to