This is an automated email from the ASF dual-hosted git repository.

zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 52cc84e  [Feature] add stream load batch write (#168)
52cc84e is described below

commit 52cc84ee2f9412403789d5e8def70eb55a697d28
Author: wudi <[email protected]>
AuthorDate: Mon Aug 7 10:14:29 2023 +0800

    [Feature] add stream load batch write (#168)
    
    At present, the writing of Flink Connector can only rely on the writing of 
checkpoint.
    Not very friendly to two scenarios:
    1. Jobs that do not need to enable checkpoint
    2. The ETL of Flink jobs is complicated, which makes the checkpoint very 
slow, but it does not want to affect the writing performance.
---
 .../doris/flink/cfg/DorisExecutionOptions.java     |  88 ++++++-
 .../DorisBatchLoadException.java}                  |  31 ++-
 .../org/apache/doris/flink/sink/BackendUtil.java   |  67 +++++
 .../doris/flink/sink/batch/BatchRecordBuffer.java  | 142 ++++++++++
 .../doris/flink/sink/batch/DorisBatchSink.java     |  97 +++++++
 .../flink/sink/batch/DorisBatchStreamLoad.java     | 292 +++++++++++++++++++++
 .../doris/flink/sink/batch/DorisBatchWriter.java   | 112 ++++++++
 .../doris/flink/sink/writer/DorisWriter.java       |  50 +---
 .../doris/flink/sink/writer/LabelGenerator.java    |   4 +
 .../doris/flink/table/DorisConfigOptions.java      |  32 +++
 .../flink/table/DorisDynamicTableFactory.java      |  20 ++
 .../doris/flink/table/DorisDynamicTableSink.java   |  24 +-
 .../apache/doris/flink/DorisSinkBatchExample.java  |  91 +++++++
 .../apache/doris/flink/sink/TestBackendUtil.java   |  58 ++++
 .../flink/sink/batch/TestBatchRecordBuffer.java    |  75 ++++++
 .../doris/flink/sink/writer/TestDorisWriter.java   |  34 ---
 16 files changed, 1113 insertions(+), 104 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index 03adf19..a3f6c50 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -32,21 +32,30 @@ public class DorisExecutionOptions implements Serializable {
     public static final int DEFAULT_MAX_RETRY_TIMES = 1;
     private static final int DEFAULT_BUFFER_SIZE = 1024 * 1024;
     private static final int DEFAULT_BUFFER_COUNT = 3;
+    //batch flush
+    private static final int DEFAULT_FLUSH_QUEUE_SIZE = 2;
+    private static final int DEFAULT_BUFFER_FLUSH_MAX_ROWS = 50000;
+    private static final int DEFAULT_BUFFER_FLUSH_MAX_BYTES = 10 * 1024 * 1024;
+    private static final long DEFAULT_BUFFER_FLUSH_INTERVAL_MS = 10 * 1000;
     private final int checkInterval;
     private final int maxRetries;
     private final int bufferSize;
     private final int bufferCount;
     private final String labelPrefix;
-
     /**
      * Properties for the StreamLoad.
      */
     private final Properties streamLoadProp;
-
     private final Boolean enableDelete;
-
     private final Boolean enable2PC;
 
+    //batch mode param
+    private int flushQueueSize;
+    private int bufferFlushMaxRows;
+    private int bufferFlushMaxBytes;
+    private long bufferFlushIntervalMs;
+    private boolean enableBatchMode;
+
     public DorisExecutionOptions(int checkInterval,
                                  int maxRetries,
                                  int bufferSize,
@@ -54,7 +63,12 @@ public class DorisExecutionOptions implements Serializable {
                                  String labelPrefix,
                                  Properties streamLoadProp,
                                  Boolean enableDelete,
-                                 Boolean enable2PC) {
+                                 Boolean enable2PC,
+                                 boolean enableBatchMode,
+                                 int flushQueueSize,
+                                 int bufferFlushMaxRows,
+                                 int bufferFlushMaxBytes,
+                                 long bufferFlushIntervalMs) {
         Preconditions.checkArgument(maxRetries >= 0);
         this.checkInterval = checkInterval;
         this.maxRetries = maxRetries;
@@ -64,6 +78,12 @@ public class DorisExecutionOptions implements Serializable {
         this.streamLoadProp = streamLoadProp;
         this.enableDelete = enableDelete;
         this.enable2PC = enable2PC;
+
+        this.enableBatchMode = enableBatchMode;
+        this.flushQueueSize = flushQueueSize;
+        this.bufferFlushMaxRows = bufferFlushMaxRows;
+        this.bufferFlushMaxBytes = bufferFlushMaxBytes;
+        this.bufferFlushIntervalMs = bufferFlushIntervalMs;
     }
 
     public static Builder builder() {
@@ -119,6 +139,27 @@ public class DorisExecutionOptions implements Serializable 
{
     public Boolean enabled2PC() {
         return enable2PC;
     }
+
+    public int getFlushQueueSize() {
+        return flushQueueSize;
+    }
+
+    public int getBufferFlushMaxRows() {
+        return bufferFlushMaxRows;
+    }
+
+    public int getBufferFlushMaxBytes() {
+        return bufferFlushMaxBytes;
+    }
+
+    public long getBufferFlushIntervalMs() {
+        return bufferFlushIntervalMs;
+    }
+
+    public boolean enableBatchMode() {
+        return enableBatchMode;
+    }
+
     /**
      * Builder of {@link DorisExecutionOptions}.
      */
@@ -130,9 +171,15 @@ public class DorisExecutionOptions implements Serializable 
{
         private String labelPrefix = "";
         private Properties streamLoadProp = new Properties();
         private boolean enableDelete = true;
-
         private boolean enable2PC = true;
 
+        private int flushQueueSize = DEFAULT_FLUSH_QUEUE_SIZE;
+        private int bufferFlushMaxRows = DEFAULT_BUFFER_FLUSH_MAX_ROWS;
+        private int bufferFlushMaxBytes = DEFAULT_BUFFER_FLUSH_MAX_BYTES;
+        private long bufferFlushIntervalMs = DEFAULT_BUFFER_FLUSH_INTERVAL_MS;
+        private boolean enableBatchMode = false;
+
+
         public Builder setCheckInterval(Integer checkInterval) {
             this.checkInterval = checkInterval;
             return this;
@@ -173,10 +220,35 @@ public class DorisExecutionOptions implements 
Serializable {
             return this;
         }
 
-        public DorisExecutionOptions build() {
-            return new DorisExecutionOptions(checkInterval, maxRetries, 
bufferSize, bufferCount, labelPrefix, streamLoadProp, enableDelete, enable2PC);
+        public Builder enableBatchMode() {
+            this.enableBatchMode = true;
+            return this;
+        }
+
+        public Builder setFlushQueueSize(int flushQueueSize) {
+            this.flushQueueSize = flushQueueSize;
+            return this;
+        }
+
+        public Builder setBufferFlushIntervalMs(long bufferFlushIntervalMs) {
+            Preconditions.checkState(bufferFlushIntervalMs >= 1000, 
"bufferFlushIntervalMs must be greater than or equal to 1 second");
+            this.bufferFlushIntervalMs = bufferFlushIntervalMs;
+            return this;
         }
-    }
 
+        public Builder setBufferFlushMaxRows(int bufferFlushMaxRows) {
+            this.bufferFlushMaxRows = bufferFlushMaxRows;
+            return this;
+        }
 
+        public Builder setBufferFlushMaxBytes(int bufferFlushMaxBytes) {
+            this.bufferFlushMaxBytes = bufferFlushMaxBytes;
+            return this;
+        }
+
+        public DorisExecutionOptions build() {
+            return new DorisExecutionOptions(checkInterval, maxRetries, 
bufferSize, bufferCount, labelPrefix,
+                    streamLoadProp, enableDelete, enable2PC, enableBatchMode, 
flushQueueSize, bufferFlushMaxRows, bufferFlushMaxBytes, bufferFlushIntervalMs);
+        }
+    }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisBatchLoadException.java
similarity index 52%
copy from 
flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
copy to 
flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisBatchLoadException.java
index d31e777..265b3d2 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisBatchLoadException.java
@@ -14,23 +14,32 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-package org.apache.doris.flink.sink.writer;
 
-import java.util.UUID;
+package org.apache.doris.flink.exception;
 
 /**
- * Generator label for stream load.
+ * Doris batch load run exception.
  */
-public class LabelGenerator {
-    private String labelPrefix;
-    private boolean enable2PC;
+public class DorisBatchLoadException extends RuntimeException {
+    public DorisBatchLoadException() {
+        super();
+    }
+
+    public DorisBatchLoadException(String message) {
+        super(message);
+    }
+
+    public DorisBatchLoadException(String message, Throwable cause) {
+        super(message, cause);
+    }
 
-    public LabelGenerator(String labelPrefix, boolean enable2PC) {
-        this.labelPrefix = labelPrefix;
-        this.enable2PC = enable2PC;
+    public DorisBatchLoadException(Throwable cause) {
+        super(cause);
     }
 
-    public String generateLabel(long chkId) {
-        return enable2PC ? labelPrefix + "_" + chkId : labelPrefix + "_" + 
UUID.randomUUID();
+    protected DorisBatchLoadException(String message, Throwable cause,
+                                      boolean enableSuppression,
+                                      boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
     }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
new file mode 100644
index 0000000..701cad6
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink;
+
+import org.apache.doris.flink.exception.DorisRuntimeException;
+import org.apache.doris.flink.rest.models.BackendV2;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.List;
+
+public class BackendUtil {
+    private static final Logger LOG = 
LoggerFactory.getLogger(BackendUtil.class);
+    private final List<BackendV2.BackendRowV2> backends;
+    private long pos;
+
+    public BackendUtil(List<BackendV2.BackendRowV2> backends) {
+        this.backends = backends;
+        this.pos = 0;
+    }
+
+    public String getAvailableBackend() {
+        long tmp = pos + backends.size();
+        while (pos < tmp) {
+            BackendV2.BackendRowV2 backend = backends.get((int) (pos % 
backends.size()));
+            String res = backend.toBackendString();
+            if(tryHttpConnection(res)){
+                pos++;
+                return res;
+            }
+        }
+        throw new DorisRuntimeException("no available backend.");
+    }
+
+    public boolean tryHttpConnection(String backend) {
+        try {
+            backend = "http://"; + backend;
+            URL url = new URL(backend);
+            HttpURLConnection co =  (HttpURLConnection) url.openConnection();
+            co.setConnectTimeout(60000);
+            co.connect();
+            co.disconnect();
+            return true;
+        } catch (Exception ex) {
+            LOG.warn("Failed to connect to backend:{}", backend, ex);
+            pos++;
+            return false;
+        }
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java
new file mode 100644
index 0000000..99876bb
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.batch;
+
+import org.apache.doris.flink.sink.writer.RecordBuffer;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+
+
+/**
+ * buffer to queue
+ */
+public class BatchRecordBuffer {
+    private static final Logger LOG = 
LoggerFactory.getLogger(BatchRecordBuffer.class);
+    public static final String LINE_SEPARATOR = "\n";
+    private String labelName;
+    private ByteBuffer buffer;
+    private byte[] lineDelimiter;
+    private int numOfRecords = 0;
+    private int bufferSizeBytes = 0;
+    private boolean loadBatchFirstRecord = true;
+
+    public BatchRecordBuffer(){}
+
+    public BatchRecordBuffer(byte[] lineDelimiter, int bufferSize) {
+        super();
+        this.lineDelimiter = lineDelimiter;
+        this.buffer = ByteBuffer.allocate(bufferSize);
+    }
+
+    public void insert(byte[] record) {
+        ensureCapacity(record.length);
+        if(loadBatchFirstRecord){
+            loadBatchFirstRecord = false;
+        } else {
+            this.buffer.put(this.lineDelimiter);
+        }
+        this.buffer.put(record);
+        setNumOfRecords(getNumOfRecords() + 1);
+        setBufferSizeBytes(getBufferSizeBytes() + record.length);
+    }
+
+    @VisibleForTesting
+    public void ensureCapacity(int length) {
+        if(buffer.remaining() >= length){
+            return;
+        }
+        int currentRemain = buffer.remaining();
+        int currentCapacity = buffer.capacity();
+
+        int target = buffer.remaining() + length;
+        int capacity = buffer.capacity();
+        //grow 512kb each time
+        target = Math.max(target, Math.min(capacity + 512 * 1024, capacity * 
2));
+        ByteBuffer tmp = ByteBuffer.allocate(target);
+        buffer.flip();
+        tmp.put(buffer);
+        buffer.clear();
+        buffer = tmp;
+        LOG.info("record length {},buffer remain {} ,grow capacity {} to {}", 
length, currentRemain, currentCapacity, target);
+    }
+
+    public String getLabelName() {
+        return labelName;
+    }
+
+    public void setLabelName(String labelName) {
+        this.labelName = labelName;
+    }
+
+    /**
+     * @return true if buffer is empty
+     */
+    public boolean isEmpty() {
+        return numOfRecords == 0;
+    }
+
+    public ByteBuffer getData() {
+        //change mode
+        buffer.flip();
+        LOG.debug("flush buffer: {} records, {} 
bytes",getNumOfRecords(),getBufferSizeBytes());
+        return buffer;
+    }
+
+    public void clear(){
+        this.buffer.clear();
+        this.numOfRecords = 0;
+        this.bufferSizeBytes = 0;
+        this.labelName = null;
+        this.loadBatchFirstRecord = true;
+    }
+
+    public ByteBuffer getBuffer(){
+        return buffer;
+    }
+    /**
+     * @return Number of records in this buffer
+     */
+    public int getNumOfRecords() {
+        return numOfRecords;
+    }
+
+    /**
+     * @return Buffer size in bytes
+     */
+    public int getBufferSizeBytes() {
+        return bufferSizeBytes;
+    }
+
+    /**
+     * @param numOfRecords Updates number of records (Usually by 1)
+     */
+    public void setNumOfRecords(int numOfRecords) {
+        this.numOfRecords = numOfRecords;
+    }
+
+    /**
+     * @param bufferSizeBytes Updates sum of size of records present in this 
buffer (Bytes)
+     */
+    public void setBufferSizeBytes(int bufferSizeBytes) {
+        this.bufferSizeBytes = bufferSizeBytes;
+    }
+
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchSink.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchSink.java
new file mode 100644
index 0000000..2c578d4
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchSink.java
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.batch;
+
+import org.apache.doris.flink.cfg.DorisExecutionOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.sink.writer.DorisRecordSerializer;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+public class DorisBatchSink<IN> implements Sink<IN> {
+    private final DorisOptions dorisOptions;
+    private final DorisReadOptions dorisReadOptions;
+    private final DorisExecutionOptions dorisExecutionOptions;
+    private final DorisRecordSerializer<IN> serializer;
+
+    public DorisBatchSink(DorisOptions dorisOptions,
+                          DorisReadOptions dorisReadOptions,
+                          DorisExecutionOptions dorisExecutionOptions,
+                          DorisRecordSerializer<IN> serializer) {
+        this.dorisOptions = dorisOptions;
+        this.dorisReadOptions = dorisReadOptions;
+        this.dorisExecutionOptions = dorisExecutionOptions;
+        this.serializer = serializer;
+    }
+
+    @Override
+    public SinkWriter<IN> createWriter(InitContext initContext) throws 
IOException {
+        DorisBatchWriter<IN> dorisBatchWriter = new 
DorisBatchWriter<IN>(initContext, serializer, dorisOptions, dorisReadOptions, 
dorisExecutionOptions);
+        dorisBatchWriter.initializeLoad();
+        return dorisBatchWriter;
+    }
+
+    public static <IN> DorisBatchSink.Builder<IN> builder() {
+        return new DorisBatchSink.Builder<>();
+    }
+
+    /**
+     * build for DorisBatchSink.
+     * @param <IN> record type.
+     */
+    public static class Builder<IN> {
+        private DorisOptions dorisOptions;
+        private DorisReadOptions dorisReadOptions;
+        private DorisExecutionOptions dorisExecutionOptions;
+        private DorisRecordSerializer<IN> serializer;
+
+        public DorisBatchSink.Builder<IN> setDorisOptions(DorisOptions 
dorisOptions) {
+            this.dorisOptions = dorisOptions;
+            return this;
+        }
+
+        public DorisBatchSink.Builder<IN> setDorisReadOptions(DorisReadOptions 
dorisReadOptions) {
+            this.dorisReadOptions = dorisReadOptions;
+            return this;
+        }
+
+        public DorisBatchSink.Builder<IN> 
setDorisExecutionOptions(DorisExecutionOptions dorisExecutionOptions) {
+            this.dorisExecutionOptions = dorisExecutionOptions;
+            return this;
+        }
+
+        public DorisBatchSink.Builder<IN> 
setSerializer(DorisRecordSerializer<IN> serializer) {
+            this.serializer = serializer;
+            return this;
+        }
+
+        public DorisBatchSink<IN> build() {
+            Preconditions.checkNotNull(dorisOptions);
+            Preconditions.checkNotNull(dorisExecutionOptions);
+            Preconditions.checkNotNull(serializer);
+            if(dorisReadOptions == null) {
+                dorisReadOptions = DorisReadOptions.builder().build();
+            }
+            return new DorisBatchSink<>(dorisOptions, dorisReadOptions, 
dorisExecutionOptions, serializer);
+        }
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
new file mode 100644
index 0000000..8f2dcc1
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
@@ -0,0 +1,292 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.batch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.doris.flink.cfg.DorisExecutionOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.exception.DorisBatchLoadException;
+import org.apache.doris.flink.rest.RestService;
+import org.apache.doris.flink.rest.models.RespContent;
+import org.apache.doris.flink.sink.BackendUtil;
+import org.apache.doris.flink.sink.EscapeHandler;
+import org.apache.doris.flink.sink.HttpPutBuilder;
+import org.apache.doris.flink.sink.HttpUtil;
+import org.apache.doris.flink.sink.writer.LabelGenerator;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.doris.flink.sink.LoadStatus.PUBLISH_TIMEOUT;
+import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
+import static 
org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
+import static 
org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY;
+
+/**
+ * async stream load
+ **/
+public class DorisBatchStreamLoad implements Serializable {
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOG = 
LoggerFactory.getLogger(DorisBatchStreamLoad.class);
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final List<String> DORIS_SUCCESS_STATUS = new 
ArrayList<>(Arrays.asList(SUCCESS, PUBLISH_TIMEOUT));
+    private final LabelGenerator labelGenerator;
+    private final byte[] lineDelimiter;
+    private static final String LOAD_URL_PATTERN = 
"http://%s/api/%s/%s/_stream_load";;
+    private String loadUrl;
+    private String hostPort;
+    private final String username;
+    private final String password;
+    private final String db;
+    private final String table;
+    private final Properties loadProps;
+    private BatchRecordBuffer buffer;
+    private DorisExecutionOptions executionOptions;
+    private ExecutorService loadExecutorService;
+    private LoadAsyncExecutor loadAsyncExecutor;
+    private BlockingQueue<BatchRecordBuffer> writeQueue;
+    private BlockingQueue<BatchRecordBuffer> readQueue;
+    private final AtomicBoolean started;
+    private AtomicReference<Throwable> exception = new AtomicReference<>(null);
+    private CloseableHttpClient httpClient = new HttpUtil().getHttpClient();
+    private BackendUtil backendUtil;
+
+    public DorisBatchStreamLoad(DorisOptions dorisOptions,
+                                DorisReadOptions dorisReadOptions,
+                                DorisExecutionOptions executionOptions,
+                                LabelGenerator labelGenerator) {
+        this.backendUtil = new 
BackendUtil(RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG));
+        this.hostPort = backendUtil.getAvailableBackend();
+        String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
+        this.db = tableInfo[0];
+        this.table = tableInfo[1];
+        this.username = dorisOptions.getUsername();
+        this.password = dorisOptions.getPassword();
+        this.loadUrl = String.format(LOAD_URL_PATTERN, hostPort, db, table);
+        this.loadProps = executionOptions.getStreamLoadProp();
+        this.labelGenerator = labelGenerator;
+        this.lineDelimiter = 
EscapeHandler.escapeString(loadProps.getProperty(LINE_DELIMITER_KEY, 
LINE_DELIMITER_DEFAULT)).getBytes();
+        this.executionOptions = executionOptions;
+        //init queue
+        this.writeQueue = new 
ArrayBlockingQueue<>(executionOptions.getFlushQueueSize());
+        LOG.info("init RecordBuffer capacity {}, count {}", 
executionOptions.getBufferFlushMaxBytes(), 
executionOptions.getFlushQueueSize());
+        for (int index = 0; index < executionOptions.getFlushQueueSize(); 
index++) {
+            this.writeQueue.add(new BatchRecordBuffer(this.lineDelimiter, 
executionOptions.getBufferFlushMaxBytes()));
+        }
+        readQueue = new LinkedBlockingDeque<>();
+
+        this.loadAsyncExecutor= new LoadAsyncExecutor();
+        this.loadExecutorService = new ThreadPoolExecutor(1, 1, 0L, 
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1), new 
DefaultThreadFactory("streamload-executor"), new 
ThreadPoolExecutor.AbortPolicy());
+        this.started = new AtomicBoolean(true);
+        this.loadExecutorService.execute(loadAsyncExecutor);
+    }
+
+    /**
+     * write record into cache.
+     * @param record
+     * @throws IOException
+     */
+    public void writeRecord(byte[] record) throws InterruptedException {
+        checkFlushException();
+        if(buffer == null){
+            buffer = takeRecordFromWriteQueue();
+        }
+        buffer.insert(record);
+        //When it exceeds 80% of the byteSize,to flush, to avoid triggering 
bytebuffer expansion
+        if (buffer.getBufferSizeBytes() >= 
executionOptions.getBufferFlushMaxBytes() * 0.8
+                || (executionOptions.getBufferFlushMaxRows() != 0 && 
buffer.getNumOfRecords() >= executionOptions.getBufferFlushMaxRows())) {
+            flush(false);
+        }
+    }
+
+    public void flush(boolean waitUtilDone) throws InterruptedException {
+        checkFlushException();
+        if (buffer == null) {
+            LOG.debug("buffer is empty, skip flush.");
+            return;
+        }
+        buffer.setLabelName(labelGenerator.generateBatchLabel());
+        BatchRecordBuffer tmpBuff = buffer;
+        readQueue.put(tmpBuff);
+        if(waitUtilDone){
+            waitAsyncLoadFinish();
+        }
+        this.buffer = null;
+    }
+
+    private void putRecordToWriteQueue(BatchRecordBuffer buffer){
+        try {
+            writeQueue.put(buffer);
+        } catch (InterruptedException e) {
+            throw new RuntimeException("Failed to recycle a buffer to queue");
+        }
+    }
+
+    private BatchRecordBuffer takeRecordFromWriteQueue(){
+        checkFlushException();
+        try {
+            return writeQueue.take();
+        } catch (InterruptedException e) {
+            throw new RuntimeException("Failed to take a buffer from queue");
+        }
+    }
+
+    private void checkFlushException() {
+        if (exception.get() != null) {
+            throw new DorisBatchLoadException(exception.get());
+        }
+    }
+
+    private void waitAsyncLoadFinish() throws InterruptedException {
+        for(int i = 0; i < executionOptions.getFlushQueueSize() + 1 ; i++){
+            BatchRecordBuffer empty = takeRecordFromWriteQueue();
+            readQueue.put(empty);
+        }
+    }
+
+    public void close(){
+        //close async executor
+        this.loadExecutorService.shutdown();
+        this.started.set(false);
+
+        //clear buffer
+        this.writeQueue.clear();
+        this.readQueue.clear();
+    }
+
+    class LoadAsyncExecutor implements Runnable {
+        @Override
+        public void run() {
+            LOG.info("LoadAsyncExecutor start");
+            while (started.get()) {
+                BatchRecordBuffer buffer = null;
+                try {
+                    buffer = readQueue.poll(2000L, TimeUnit.MILLISECONDS);
+                    if(buffer == null){
+                        continue;
+                    }
+                    if (buffer.getLabelName() != null) {
+                        load(buffer.getLabelName(), buffer);
+                    }
+                } catch (Exception e) {
+                    LOG.error("worker running error", e);
+                    exception.set(e);
+                    break;
+                } finally {
+                    //Recycle buffer to avoid writer thread blocking
+                    if(buffer != null){
+                        buffer.clear();
+                        putRecordToWriteQueue(buffer);
+                    }
+                }
+            }
+            LOG.info("LoadAsyncExecutor stop");
+        }
+
+        /**
+         * execute stream load
+         */
+        public void load(String label, BatchRecordBuffer buffer) throws 
IOException{
+            refreshLoadUrl();
+            ByteBuffer data = buffer.getData();
+            ByteArrayEntity entity = new ByteArrayEntity(data.array(), 
data.arrayOffset(), data.limit());
+            HttpPutBuilder putBuilder = new HttpPutBuilder();
+            putBuilder.setUrl(loadUrl)
+                    .baseAuth(username, password)
+                    .setLabel(label)
+                    .addCommonHeader()
+                    .setEntity(entity)
+                    .addHiddenColumns(executionOptions.getDeletable())
+                    .addProperties(executionOptions.getStreamLoadProp());
+
+            int retry = 0;
+            while (retry <= executionOptions.getMaxRetries()) {
+                LOG.info("stream load started for {} on host {}", label, 
hostPort);
+                try (CloseableHttpResponse response = 
httpClient.execute(putBuilder.build())) {
+                    int statusCode = response.getStatusLine().getStatusCode();
+                    if (statusCode == 200 && response.getEntity() != null) {
+                        String loadResult = 
EntityUtils.toString(response.getEntity());
+                        LOG.info("load Result {}", loadResult);
+                        RespContent respContent = 
OBJECT_MAPPER.readValue(loadResult, RespContent.class);
+                        if 
(!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
+                            String errMsg = String.format("stream load error: 
%s, see more in %s", respContent.getMessage(), respContent.getErrorURL());
+                            throw new DorisBatchLoadException(errMsg);
+                        }else{
+                            return;
+                        }
+                    }
+                    LOG.error("stream load failed with {}, reason {}, to 
retry", hostPort, response.getStatusLine().toString());
+                }catch (Exception ex){
+                    if (retry == executionOptions.getMaxRetries()) {
+                        throw new DorisBatchLoadException("stream load error: 
", ex);
+                    }
+                    LOG.error("stream load error with {}, to retry, cause by", 
hostPort, ex);
+
+                }
+                retry++;
+                // get available backend retry
+                refreshLoadUrl();
+                putBuilder.setUrl(loadUrl);
+            }
+        }
+
+        private void refreshLoadUrl(){
+            hostPort = backendUtil.getAvailableBackend();
+            loadUrl = String.format(LOAD_URL_PATTERN, hostPort, db, table);
+        }
+    }
+
+    static class DefaultThreadFactory implements ThreadFactory {
+        private static final AtomicInteger poolNumber = new AtomicInteger(1);
+        private final AtomicInteger threadNumber = new AtomicInteger(1);
+        private final String namePrefix;
+
+        DefaultThreadFactory(String name) {
+            namePrefix = "pool-" + poolNumber.getAndIncrement() + "-" + name + 
"-";
+        }
+
+        public Thread newThread(Runnable r) {
+            Thread t = new Thread(r, namePrefix + 
threadNumber.getAndIncrement());
+            t.setDaemon(false);
+            return t;
+        }
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
new file mode 100644
index 0000000..d4621c7
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.batch;
+
+import org.apache.doris.flink.cfg.DorisExecutionOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.sink.writer.DorisRecordSerializer;
+import org.apache.doris.flink.sink.writer.LabelGenerator;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+public class DorisBatchWriter<IN> implements SinkWriter<IN> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(DorisBatchWriter.class);
+    private DorisBatchStreamLoad batchStreamLoad;
+    private final DorisOptions dorisOptions;
+    private final DorisReadOptions dorisReadOptions;
+    private final DorisExecutionOptions executionOptions;
+    private final String labelPrefix;
+    private final LabelGenerator labelGenerator;
+    private final long flushIntervalMs;
+    private final DorisRecordSerializer<IN> serializer;
+    private final transient ScheduledExecutorService scheduledExecutorService;
+    private transient volatile Exception flushException = null;
+
+    public DorisBatchWriter(Sink.InitContext initContext,
+                            DorisRecordSerializer<IN> serializer,
+                            DorisOptions dorisOptions,
+                            DorisReadOptions dorisReadOptions,
+                            DorisExecutionOptions executionOptions) {
+        LOG.info("labelPrefix " + executionOptions.getLabelPrefix());
+        this.labelPrefix = executionOptions.getLabelPrefix() + "_" + 
initContext.getSubtaskId();
+        this.labelGenerator = new LabelGenerator(labelPrefix, false);
+        this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1, new 
ExecutorThreadFactory("stream-load-flush-interval"));
+        this.serializer = serializer;
+        this.dorisOptions = dorisOptions;
+        this.dorisReadOptions = dorisReadOptions;
+        this.executionOptions = executionOptions;
+        this.flushIntervalMs = executionOptions.getBufferFlushIntervalMs();
+    }
+
+    public void initializeLoad() throws IOException {
+        this.batchStreamLoad = new DorisBatchStreamLoad(dorisOptions, 
dorisReadOptions, executionOptions, labelGenerator);
+        // when uploading data in streaming mode, we need to regularly detect 
whether there are exceptions.
+        scheduledExecutorService.scheduleWithFixedDelay(this::intervalFlush, 
flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS);
+    }
+
+    private void intervalFlush() {
+        try {
+            LOG.info("interval flush triggered.");
+            batchStreamLoad.flush(false);
+        } catch (InterruptedException e) {
+            flushException = e;
+        }
+    }
+
+    @Override
+    public void write(IN in, Context context) throws IOException, 
InterruptedException {
+        checkFlushException();
+        byte[] serialize = serializer.serialize(in);
+        if(Objects.isNull(serialize)){
+            //ddl record
+            return;
+        }
+        batchStreamLoad.writeRecord(serialize);
+    }
+    @Override
+    public void flush(boolean flush) throws IOException, InterruptedException {
+        checkFlushException();
+        LOG.info("checkpoint flush triggered.");
+        batchStreamLoad.flush(true);
+    }
+
+    @Override
+    public void close() throws Exception {
+        LOG.info("DorisBatchWriter Close");
+        if (scheduledExecutorService != null) {
+            scheduledExecutorService.shutdownNow();
+        }
+        batchStreamLoad.close();
+    }
+
+    private void checkFlushException() {
+        if (flushException != null) {
+            throw new RuntimeException("Writing records to streamload 
failed.", flushException);
+        }
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index 642e1d3..7890670 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -23,8 +23,8 @@ import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.exception.DorisRuntimeException;
 import org.apache.doris.flink.exception.StreamLoadException;
 import org.apache.doris.flink.rest.RestService;
-import org.apache.doris.flink.rest.models.BackendV2;
 import org.apache.doris.flink.rest.models.RespContent;
+import org.apache.doris.flink.sink.BackendUtil;
 import org.apache.doris.flink.sink.DorisCommittable;
 import org.apache.doris.flink.sink.HttpUtil;
 import org.apache.flink.annotation.VisibleForTesting;
@@ -38,8 +38,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.net.HttpURLConnection;
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -73,8 +71,7 @@ public class DorisWriter<IN> implements SinkWriter<IN, 
DorisCommittable, DorisWr
     private final transient ScheduledExecutorService scheduledExecutorService;
     private transient Thread executorThread;
     private transient volatile Exception loadException = null;
-    private List<BackendV2.BackendRowV2> backends;
-    private long pos;
+    private BackendUtil backendUtil;
     private String currentLabel;
 
     public DorisWriter(Sink.InitContext initContext,
@@ -99,16 +96,14 @@ public class DorisWriter<IN> implements SinkWriter<IN, 
DorisCommittable, DorisWr
         this.executionOptions = executionOptions;
         this.intervalTime = executionOptions.checkInterval();
         this.loading = false;
-        this.pos = 0;
     }
 
     public void initializeLoad(List<DorisWriterState> state) throws 
IOException {
         //cache backend
-        this.backends = RestService.getBackendsV2(dorisOptions, 
dorisReadOptions, LOG);
-        String backend = getAvailableBackend();
+        backendUtil = new BackendUtil(RestService.getBackendsV2(dorisOptions, 
dorisReadOptions, LOG));
         try {
             this.dorisStreamLoad = new DorisStreamLoad(
-                    backend,
+                    backendUtil.getAvailableBackend(),
                     dorisOptions,
                     executionOptions,
                     labelGenerator, new HttpUtil().getHttpClient());
@@ -168,7 +163,7 @@ public class DorisWriter<IN> implements SinkWriter<IN, 
DorisCommittable, DorisWr
     public List<DorisWriterState> snapshotState(long checkpointId) throws 
IOException {
         Preconditions.checkState(dorisStreamLoad != null);
         // dynamic refresh BE node
-        this.dorisStreamLoad.setHostPort(getAvailableBackend());
+        this.dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend());
         this.currentLabel = labelGenerator.generateLabel(checkpointId + 1);
         return Collections.singletonList(dorisWriterState);
     }
@@ -221,11 +216,6 @@ public class DorisWriter<IN> implements SinkWriter<IN, 
DorisCommittable, DorisWr
         this.dorisStreamLoad = streamLoad;
     }
 
-    @VisibleForTesting
-    public void setBackends(List<BackendV2.BackendRowV2> backends) {
-        this.backends = backends;
-    }
-
     @Override
     public void close() throws Exception {
         if (scheduledExecutorService != null) {
@@ -235,34 +225,4 @@ public class DorisWriter<IN> implements SinkWriter<IN, 
DorisCommittable, DorisWr
             dorisStreamLoad.close();
         }
     }
-
-    @VisibleForTesting
-    public String getAvailableBackend() {
-        long tmp = pos + backends.size();
-        while (pos < tmp) {
-            BackendV2.BackendRowV2 backend = backends.get((int) (pos % 
backends.size()));
-            String res = backend.toBackendString();
-            if(tryHttpConnection(res)){
-                pos++;
-                return res;
-            }
-        }
-        throw new DorisRuntimeException("no available backend.");
-    }
-
-    public boolean tryHttpConnection(String backend) {
-        try {
-            backend = "http://"; + backend;
-            URL url = new URL(backend);
-            HttpURLConnection co =  (HttpURLConnection) url.openConnection();
-            co.setConnectTimeout(60000);
-            co.connect();
-            co.disconnect();
-            return true;
-        } catch (Exception ex) {
-            LOG.warn("Failed to connect to backend:{}", backend, ex);
-            pos++;
-            return false;
-        }
-    }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
index d31e777..35edae9 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
@@ -33,4 +33,8 @@ public class LabelGenerator {
     public String generateLabel(long chkId) {
         return enable2PC ? labelPrefix + "_" + chkId : labelPrefix + "_" + 
UUID.randomUUID();
     }
+
+    public String generateBatchLabel() {
+        return labelPrefix + "_" + UUID.randomUUID();
+    }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
index 3e129aa..91cc148 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -195,6 +195,38 @@ public class DorisConfigOptions {
 
     public static final ConfigOption<Integer> SINK_PARALLELISM = 
FactoryUtil.SINK_PARALLELISM;
 
+
+    public static final ConfigOption<Boolean> SINK_ENABLE_BATCH_MODE = 
ConfigOptions
+            .key("sink.enable.batch-mode")
+            .booleanType()
+            .defaultValue(false)
+            .withDescription("Whether to enable batch write mode");
+
+    public static final ConfigOption<Integer> SINK_FLUSH_QUEUE_SIZE = 
ConfigOptions
+            .key("sink.flush.queue-size")
+            .intType()
+            .defaultValue(2)
+            .withDescription("Queue length for async stream load, default is 
2");
+
+    public static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS = 
ConfigOptions
+            .key("sink.buffer-flush.max-rows")
+            .intType()
+            .defaultValue(50000)
+            .withDescription("The maximum number of flush items in each batch, 
the default is 5w");
+
+    public static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_BYTES = 
ConfigOptions
+            .key("sink.buffer-flush.max-bytes")
+            .intType()
+            .defaultValue(10 * 1024 * 1024)
+            .withDescription("The maximum number of bytes flushed in each 
batch, the default is 10MB");
+
+    public static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL = 
ConfigOptions
+            .key("sink.buffer-flush.interval")
+            .durationType()
+            .defaultValue(Duration.ofSeconds(10))
+            .withDescription("the flush interval mills, over this time, 
asynchronous threads will flush data. The " +
+                    "default value is 10s.");
+
     // Prefix for Doris StreamLoad specific properties.
     public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties.";
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index 81c756c..e6cbb1d 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -59,10 +59,15 @@ import static 
org.apache.doris.flink.table.DorisConfigOptions.LOOKUP_JDBC_READ_T
 import static 
org.apache.doris.flink.table.DorisConfigOptions.LOOKUP_MAX_RETRIES;
 import static org.apache.doris.flink.table.DorisConfigOptions.PASSWORD;
 import static 
org.apache.doris.flink.table.DorisConfigOptions.SINK_BUFFER_COUNT;
+import static 
org.apache.doris.flink.table.DorisConfigOptions.SINK_BUFFER_FLUSH_INTERVAL;
+import static 
org.apache.doris.flink.table.DorisConfigOptions.SINK_BUFFER_FLUSH_MAX_BYTES;
+import static 
org.apache.doris.flink.table.DorisConfigOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
 import static org.apache.doris.flink.table.DorisConfigOptions.SINK_BUFFER_SIZE;
 import static 
org.apache.doris.flink.table.DorisConfigOptions.SINK_CHECK_INTERVAL;
 import static org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_2PC;
+import static 
org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_BATCH_MODE;
 import static 
org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_DELETE;
+import static 
org.apache.doris.flink.table.DorisConfigOptions.SINK_FLUSH_QUEUE_SIZE;
 import static 
org.apache.doris.flink.table.DorisConfigOptions.SINK_LABEL_PREFIX;
 import static org.apache.doris.flink.table.DorisConfigOptions.SINK_MAX_RETRIES;
 import static org.apache.doris.flink.table.DorisConfigOptions.SINK_PARALLELISM;
@@ -130,6 +135,12 @@ public final class DorisDynamicTableFactory implements 
DynamicTableSourceFactory
         options.add(SINK_BUFFER_COUNT);
         options.add(SINK_PARALLELISM);
 
+        options.add(SINK_ENABLE_BATCH_MODE);
+        options.add(SINK_BUFFER_FLUSH_MAX_ROWS);
+        options.add(SINK_BUFFER_FLUSH_MAX_BYTES);
+        options.add(SINK_FLUSH_QUEUE_SIZE);
+        options.add(SINK_BUFFER_FLUSH_INTERVAL);
+
         options.add(SOURCE_USE_OLD_API);
         return options;
     }
@@ -195,6 +206,15 @@ public final class DorisDynamicTableFactory implements 
DynamicTableSourceFactory
         if (!readableConfig.get(SINK_ENABLE_2PC)) {
             builder.disable2PC();
         }
+
+        if(readableConfig.get(SINK_ENABLE_BATCH_MODE)) {
+            builder.enableBatchMode();
+        }
+
+        builder.setFlushQueueSize(readableConfig.get(SINK_FLUSH_QUEUE_SIZE));
+        
builder.setBufferFlushMaxRows(readableConfig.get(SINK_BUFFER_FLUSH_MAX_ROWS));
+        
builder.setBufferFlushMaxBytes(readableConfig.get(SINK_BUFFER_FLUSH_MAX_BYTES));
+        
builder.setBufferFlushIntervalMs(readableConfig.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
         return builder.build();
     }
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
index ae4e137..be13fea 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
@@ -22,12 +22,14 @@ import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.rest.RestService;
 import org.apache.doris.flink.sink.DorisSink;
+import org.apache.doris.flink.sink.batch.DorisBatchSink;
 import org.apache.doris.flink.sink.writer.RowDataSerializer;
 
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.sink.SinkProvider;
+import org.apache.flink.table.connector.sink.SinkV2Provider;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Preconditions;
@@ -97,12 +99,22 @@ public class DorisDynamicTableSink implements 
DynamicTableSink {
                 .setType(loadProperties.getProperty(FORMAT_KEY, CSV))
                 .enableDelete(deletable)
                 
.setFieldDelimiter(loadProperties.getProperty(FIELD_DELIMITER_KEY, 
FIELD_DELIMITER_DEFAULT));
-        DorisSink.Builder<RowData> dorisSinkBuilder = DorisSink.builder();
-        dorisSinkBuilder.setDorisOptions(options)
-                .setDorisReadOptions(readOptions)
-                .setDorisExecutionOptions(executionOptions)
-                .setSerializer(serializerBuilder.build());
-        return SinkProvider.of(dorisSinkBuilder.build(), sinkParallelism);
+
+        if(!executionOptions.enableBatchMode()){
+            DorisSink.Builder<RowData> dorisSinkBuilder = DorisSink.builder();
+            dorisSinkBuilder.setDorisOptions(options)
+                    .setDorisReadOptions(readOptions)
+                    .setDorisExecutionOptions(executionOptions)
+                    .setSerializer(serializerBuilder.build());
+            return SinkProvider.of(dorisSinkBuilder.build(), sinkParallelism);
+        }else{
+            DorisBatchSink.Builder<RowData> dorisBatchSinkBuilder = 
DorisBatchSink.builder();
+            dorisBatchSinkBuilder.setDorisOptions(options)
+                    .setDorisReadOptions(readOptions)
+                    .setDorisExecutionOptions(executionOptions)
+                    .setSerializer(serializerBuilder.build());
+            return SinkV2Provider.of(dorisBatchSinkBuilder.build(), 
sinkParallelism);
+        }
     }
 
     @Override
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkBatchExample.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkBatchExample.java
new file mode 100644
index 0000000..a6835c6
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkBatchExample.java
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink;
+
+import org.apache.doris.flink.cfg.DorisExecutionOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.sink.batch.DorisBatchSink;
+import org.apache.doris.flink.sink.writer.SimpleStringSerializer;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import java.util.Properties;
+import java.util.UUID;
+
+
+public class DorisSinkBatchExample {
+    public static void main(String[] args) throws Exception{
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+//        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+        env.enableCheckpointing(5000);
+//        
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+//        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 
Time.milliseconds(30000)));
+        DorisBatchSink.Builder<String> builder = DorisBatchSink.builder();
+        final DorisReadOptions.Builder readOptionBuilder = 
DorisReadOptions.builder();
+        readOptionBuilder.setDeserializeArrowAsync(false)
+                .setDeserializeQueueSize(64)
+                .setExecMemLimit(2147483648L)
+                .setRequestQueryTimeoutS(3600)
+                .setRequestBatchSize(1000)
+                .setRequestConnectTimeoutMs(10000)
+                .setRequestReadTimeoutMs(10000)
+                .setRequestRetries(3)
+                .setRequestTabletSize(1024 * 1024);
+        Properties properties = new Properties();
+        properties.setProperty("column_separator", ",");
+        properties.setProperty("line_delimiter", "\n");
+        properties.setProperty("format", "csv");
+        DorisOptions.Builder dorisBuilder = DorisOptions.builder();
+        dorisBuilder.setFenodes("127.0.0.1:8030")
+                .setTableIdentifier("test.test_flink")
+                .setUsername("root")
+                .setPassword("");
+        DorisExecutionOptions.Builder  executionBuilder = 
DorisExecutionOptions.builder();
+        executionBuilder.setLabelPrefix("label")
+                .setStreamLoadProp(properties)
+                .setDeletable(false)
+                .setBufferFlushMaxBytes(8*1024)
+                .setBufferFlushMaxRows(900)
+                .setBufferFlushIntervalMs(1000 * 10);
+
+        builder.setDorisReadOptions(readOptionBuilder.build())
+                .setDorisExecutionOptions(executionBuilder.build())
+                .setSerializer(new SimpleStringSerializer())
+                .setDorisOptions(dorisBuilder.build());
+
+        env.addSource(new SourceFunction<String>() {
+            private Long id = 0L;
+            @Override
+            public void run(SourceContext<String> out) throws Exception {
+                while(true){
+                    id=id+1;
+                    String record = id + "," + UUID.randomUUID() + "," + id + 
"";
+                    out.collect(record);
+                    Thread.sleep(500);
+                }
+            }
+            @Override
+            public void cancel() {
+
+            }
+        }).sinkTo(builder.build());
+
+        env.execute("doris batch test");
+    }
+}
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/TestBackendUtil.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/TestBackendUtil.java
new file mode 100644
index 0000000..2704af6
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/TestBackendUtil.java
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink;
+
+import org.apache.doris.flink.rest.models.BackendV2;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+@Ignore
+public class TestBackendUtil {
+
+    @Test
+    public void testGetAvailableBackend() throws Exception{
+        List<BackendV2.BackendRowV2> backends = Arrays.asList(
+                newBackend("127.0.0.1", 8040),
+                newBackend("127.0.0.2", 8040),
+                newBackend("127.0.0.3", 8040));
+        BackendUtil backendUtil =  new BackendUtil(backends);
+        Assert.assertEquals(backends.get(0).toBackendString(), 
backendUtil.getAvailableBackend());
+        Assert.assertEquals(backends.get(1).toBackendString(), 
backendUtil.getAvailableBackend());
+        Assert.assertEquals(backends.get(2).toBackendString(), 
backendUtil.getAvailableBackend());
+        Assert.assertEquals(backends.get(0).toBackendString(), 
backendUtil.getAvailableBackend());
+    }
+
+    @Test
+    public void testTryHttpConnection(){
+        BackendUtil backendUtil = new BackendUtil(new ArrayList<>());
+        boolean flag = backendUtil.tryHttpConnection("127.0.0.1:8040");
+        Assert.assertFalse(flag);
+    }
+
+    private BackendV2.BackendRowV2 newBackend(String host, int port){
+        BackendV2.BackendRowV2 backend = new BackendV2.BackendRowV2();
+        backend.setIp(host);
+        backend.setHttpPort(port);
+        return backend;
+    }
+}
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchRecordBuffer.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchRecordBuffer.java
new file mode 100644
index 0000000..18cb79a
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchRecordBuffer.java
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.batch;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * test for RecordBuffer.
+ */
+public class TestBatchRecordBuffer {
+
+    @Test
+    public void testInsert(){
+        BatchRecordBuffer recordBuffer = new 
BatchRecordBuffer("\n".getBytes(StandardCharsets.UTF_8),1);
+        recordBuffer.insert("doris,1".getBytes(StandardCharsets.UTF_8));
+
+        Assert.assertEquals(1, recordBuffer.getNumOfRecords());
+        Assert.assertEquals("doris,1".getBytes(StandardCharsets.UTF_8).length, 
recordBuffer.getBufferSizeBytes());
+
+        recordBuffer.insert("doris,2".getBytes(StandardCharsets.UTF_8));
+        Assert.assertEquals(2, recordBuffer.getNumOfRecords());
+        
Assert.assertEquals("doris,1\ndoris,2".getBytes(StandardCharsets.UTF_8).length 
- "\n".getBytes(StandardCharsets.UTF_8).length, 
recordBuffer.getBufferSizeBytes());
+
+        ByteBuffer data = recordBuffer.getData();
+        Assert.assertEquals("doris,1\ndoris,2", new String(data.array(), 
data.arrayOffset(), data.limit()));
+
+        //mock flush
+        recordBuffer.clear();
+        recordBuffer.insert("doris,3".getBytes(StandardCharsets.UTF_8));
+        Assert.assertEquals(1, recordBuffer.getNumOfRecords());
+        Assert.assertEquals("doris,3".getBytes(StandardCharsets.UTF_8).length, 
recordBuffer.getBufferSizeBytes());
+    }
+
+    @Test
+    public void testGrowCapacity(){
+        BatchRecordBuffer recordBuffer = new 
BatchRecordBuffer("\n".getBytes(StandardCharsets.UTF_8),1);
+        recordBuffer.ensureCapacity(10);
+
+        Assert.assertEquals(recordBuffer.getBuffer().capacity(), 10 + 1);
+
+        recordBuffer.ensureCapacity(100);
+        Assert.assertEquals(recordBuffer.getBuffer().capacity(), 100 + 11);
+
+        recordBuffer.ensureCapacity(1024);
+        Assert.assertEquals(recordBuffer.getBuffer().capacity(), 1024 + 111);
+
+        recordBuffer = new 
BatchRecordBuffer("\n".getBytes(StandardCharsets.UTF_8),16);
+        recordBuffer.ensureCapacity(16);
+        Assert.assertEquals(recordBuffer.getBuffer().capacity(), 16);
+
+        recordBuffer.insert("1234567890".getBytes(StandardCharsets.UTF_8));
+        recordBuffer.ensureCapacity(8);
+        Assert.assertEquals(recordBuffer.getBuffer().capacity(), 32);
+    }
+
+}
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
index a45261b..9e44336 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
@@ -20,7 +20,6 @@ package org.apache.doris.flink.sink.writer;
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
-import org.apache.doris.flink.rest.models.BackendV2;
 import org.apache.doris.flink.sink.DorisCommittable;
 import org.apache.doris.flink.sink.HttpTestUtil;
 import org.apache.doris.flink.sink.OptionUtils;
@@ -32,7 +31,6 @@ import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.OptionalLong;
@@ -95,36 +93,4 @@ public class TestDorisWriter {
         Assert.assertEquals("doris", writerStates.get(0).getLabelPrefix());
         Assert.assertTrue(dorisWriter.isLoading());
     }
-
-    @Test
-    public void testGetAvailableBackend() throws Exception{
-        Sink.InitContext initContext = mock(Sink.InitContext.class);
-        DorisWriter<String> dorisWriter = new DorisWriter<String>(initContext, 
Collections.emptyList(), new SimpleStringSerializer(), dorisOptions, 
readOptions, executionOptions);
-        List<BackendV2.BackendRowV2> backends = Arrays.asList(
-                newBackend("127.0.0.1", 8040),
-                newBackend("127.0.0.2", 8040),
-                newBackend("127.0.0.3", 8040));
-        dorisWriter.setBackends(backends);
-        Assert.assertEquals(backends.get(0).toBackendString(), 
dorisWriter.getAvailableBackend());
-        Assert.assertEquals(backends.get(1).toBackendString(), 
dorisWriter.getAvailableBackend());
-        Assert.assertEquals(backends.get(2).toBackendString(), 
dorisWriter.getAvailableBackend());
-        Assert.assertEquals(backends.get(0).toBackendString(), 
dorisWriter.getAvailableBackend());
-    }
-
-    @Test
-    public void testTryHttpConnection(){
-        Sink.InitContext initContext = mock(Sink.InitContext.class);
-        DorisWriter<String> dorisWriter = new DorisWriter<String>(initContext, 
Collections.emptyList(), new SimpleStringSerializer(), dorisOptions, 
readOptions, executionOptions);
-        boolean flag = dorisWriter.tryHttpConnection("127.0.0.1:8040");
-        Assert.assertFalse(flag);
-    }
-
-    private BackendV2.BackendRowV2 newBackend(String host, int port){
-        BackendV2.BackendRowV2 backend = new BackendV2.BackendRowV2();
-        backend.setIp(host);
-        backend.setHttpPort(port);
-        return backend;
-    }
-
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to