This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new a362b5f8c [INLONG-5628][Sort] Add buffer limit of sink dispatch queue 
(#6380)
a362b5f8c is described below

commit a362b5f8cd79377e1e678a7e730dbb3518e0d70f
Author: 卢春亮 <[email protected]>
AuthorDate: Wed Nov 23 19:16:08 2022 +0800

    [INLONG-5628][Sort] Add buffer limit of sink dispatch queue (#6380)
---
 .../standalone/utils/FlumeConfigGenerator.java     |  9 +++---
 .../inlong/sort/standalone/sink/SinkContext.java   | 24 +++++++++++----
 .../sink/elasticsearch/EsCallbackListener.java     |  1 +
 .../sink/elasticsearch/EsOutputChannel.java        |  2 +-
 .../sort/standalone/sink/elasticsearch/EsSink.java | 12 ++++----
 .../sink/elasticsearch/EsSinkContext.java          | 34 +++++++++++++++-------
 .../standalone/source/sortsdk/SortSdkSource.java   |  3 +-
 .../TestDefaultEvent2IndexRequestHandler.java      | 10 +++----
 .../sink/elasticsearch/TestEsCallbackListener.java |  8 ++---
 .../sink/elasticsearch/TestEsChannelWorker.java    |  6 ++--
 .../sink/elasticsearch/TestEsOutputChannel.java    | 12 ++++----
 .../sink/elasticsearch/TestEsSinkContext.java      | 21 ++++++-------
 12 files changed, 87 insertions(+), 55 deletions(-)

diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java
index ca3e1b102..76083ff08 100644
--- 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java
@@ -34,6 +34,8 @@ public class FlumeConfigGenerator {
     public static final String KEY_SORT_SINK_TYPE = "sortSink.type";
     public static final String KEY_SORT_SOURCE_TYPE = "sortSource.type";
     public static final String KEY_SORT_INTERCEPTOR_TYPE = "interceptor.type";
+    public static final String DEFAULT_SORT_INTERCEPTOR_TYPE 
+        = 
"org.apache.inlong.sort.standalone.rollback.TimeBasedFilterInterceptor$Builder";
     public static final String KEY_ROLLBACK_START_TIME = "rollback.startTime";
     public static final String KEY_ROLLBACK_STOP_TIME = "rollback.stopTime";
 
@@ -138,8 +140,7 @@ public class FlumeConfigGenerator {
      */
     private static void appendSources(
             Map<String, String> flumeConf,
-            String name, Map<String,
-            String> sinkParams) {
+            String name, Map<String, String> sinkParams) {
         // sources
         String sourceName = name + "Source";
         flumeConf.put(name + ".sources", sourceName);
@@ -169,8 +170,8 @@ public class FlumeConfigGenerator {
         builder.setLength(0);
         String interceptorType = 
builder.append(prefix).append("interceptors.").append(interceptorName)
                 .append(".type").toString();
-        
Optional.ofNullable(CommonPropertiesHolder.getString(KEY_SORT_INTERCEPTOR_TYPE))
-                .map(type -> flumeConf.put(interceptorType, type));
+        flumeConf.put(interceptorType,
+                CommonPropertiesHolder.getString(KEY_SORT_INTERCEPTOR_TYPE, 
DEFAULT_SORT_INTERCEPTOR_TYPE));
         builder.setLength(0);
         String startTimeKey = 
builder.append(prefix).append("interceptors.").append(interceptorName).append(".")
                 .append(KEY_ROLLBACK_START_TIME).toString();
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
index 3f1222eb4..a7f9530f6 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
@@ -17,11 +17,6 @@
 
 package org.apache.inlong.sort.standalone.sink;
 
-import java.util.Date;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
@@ -32,9 +27,15 @@ import 
org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
 import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
 import org.apache.inlong.sort.standalone.metrics.SortMetricItemSet;
+import org.apache.inlong.sort.standalone.utils.BufferQueue;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 import org.slf4j.Logger;
 
+import java.util.Date;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+
 /**
  * 
  * SinkContext
@@ -47,6 +48,8 @@ public class SinkContext {
     public static final String KEY_PROCESSINTERVAL = "processInterval";
     public static final String KEY_RELOADINTERVAL = "reloadInterval";
     public static final String KEY_TASK_NAME = "taskName";
+    public static final String KEY_MAX_BUFFERQUEUE_SIZE_KB = 
"maxBufferQueueSizeKb";
+    public static final int DEFAULT_MAX_BUFFERQUEUE_SIZE_KB = 128 * 1024;
 
     protected final String clusterId;
     protected final String taskName;
@@ -237,4 +240,15 @@ public class SinkContext {
         dimensions.put(SortMetricItem.KEY_INLONG_GROUP_ID, inlongGroupId);
         dimensions.put(SortMetricItem.KEY_INLONG_STREAM_ID, inlongStreamId);
     }
+
+    /**
+     * createBufferQueue
+     * @return
+     */
+    public static <U> BufferQueue<U> createBufferQueue() {
+        int maxBufferQueueSizeKb = 
CommonPropertiesHolder.getInteger(KEY_MAX_BUFFERQUEUE_SIZE_KB,
+                DEFAULT_MAX_BUFFERQUEUE_SIZE_KB);
+        BufferQueue<U> dispatchQueue = new BufferQueue<>(maxBufferQueueSizeKb);
+        return dispatchQueue;
+    }
 }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsCallbackListener.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsCallbackListener.java
index 2e5419492..c149c279b 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsCallbackListener.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsCallbackListener.java
@@ -87,6 +87,7 @@ public class EsCallbackListener implements 
BulkProcessor.Listener {
                 context.backDispatchQueue(requestItem);
             } else {
                 context.addSendResultMetric(event, context.getTaskName(), 
true, sendTime);
+                context.releaseDispatchQueue(requestItem);
                 event.ack();
             }
         }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsOutputChannel.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsOutputChannel.java
index 717d3853d..9b4208fb3 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsOutputChannel.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsOutputChannel.java
@@ -201,7 +201,7 @@ public class EsOutputChannel extends Thread {
                 return;
             }
             // get indexRequest
-            indexRequest = context.taskDispatchQueue();
+            indexRequest = context.takeDispatchQueue();
             if (indexRequest == null) {
                 Thread.sleep(context.getProcessInterval());
                 return;
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSink.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSink.java
index 35836d98d..97aa9314c 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSink.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSink.java
@@ -17,17 +17,18 @@
 
 package org.apache.inlong.sort.standalone.sink.elasticsearch;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
-
 import org.apache.flume.Context;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.sink.AbstractSink;
+import org.apache.inlong.sort.standalone.sink.SinkContext;
+import org.apache.inlong.sort.standalone.utils.BufferQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * EsSink
  */
@@ -36,7 +37,7 @@ public class EsSink extends AbstractSink implements 
Configurable {
     public static final Logger LOG = LoggerFactory.getLogger(EsSink.class);
 
     private Context parentContext;
-    private final LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new 
LinkedBlockingQueue<>();
+    private BufferQueue<EsIndexRequest> dispatchQueue;
     private EsSinkContext context;
     // workers
     private List<EsChannelWorker> workers = new ArrayList<>();
@@ -50,6 +51,7 @@ public class EsSink extends AbstractSink implements 
Configurable {
     public void start() {
         super.start();
         try {
+            this.dispatchQueue = SinkContext.createBufferQueue();
             this.context = new EsSinkContext(getName(), parentContext, 
getChannel(), dispatchQueue);
             this.context.start();
             for (int i = 0; i < context.getMaxThreads(); i++) {
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
index 6f22a6876..611be5756 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
@@ -33,6 +33,7 @@ import org.apache.inlong.sort.standalone.config.pojo.InlongId;
 import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
 import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
 import org.apache.inlong.sort.standalone.sink.SinkContext;
+import org.apache.inlong.sort.standalone.utils.BufferQueue;
 import org.apache.inlong.sort.standalone.utils.Constants;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 import org.slf4j.Logger;
@@ -42,7 +43,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -70,13 +70,13 @@ public class EsSinkContext extends SinkContext {
     public static final int DEFAULT_FLUSH_INTERVAL = 60;
     public static final int DEFAULT_CONCURRENT_REQUESTS = 5;
     public static final int DEFAULT_MAX_CONNECT = 10;
-    public static final int DEFAULT_KEYWORD_MAX_LENGTH = 32 * 1024 - 1;
+    public static final int DEFAULT_KEYWORD_MAX_LENGTH = 31 * 1024;
     public static final boolean DEFAULT_IS_USE_INDEX_ID = false;
 
     private Context sinkContext;
     private String nodeId;
     private Map<String, EsIdConfig> idConfigMap = new ConcurrentHashMap<>();
-    private final LinkedBlockingQueue<EsIndexRequest> dispatchQueue;
+    private final BufferQueue<EsIndexRequest> dispatchQueue;
     private AtomicLong offerCounter = new AtomicLong(0);
     private AtomicLong takeCounter = new AtomicLong(0);
     private AtomicLong backCounter = new AtomicLong(0);
@@ -103,7 +103,7 @@ public class EsSinkContext extends SinkContext {
      * @param dispatchQueue
      */
     public EsSinkContext(String sinkName, Context context, Channel channel,
-            LinkedBlockingQueue<EsIndexRequest> dispatchQueue) {
+            BufferQueue<EsIndexRequest> dispatchQueue) {
         super(sinkName, context, channel);
         this.sinkContext = context;
         this.dispatchQueue = dispatchQueue;
@@ -149,6 +149,7 @@ public class EsSinkContext extends SinkContext {
             this.flushInterval = sinkContext.getInteger(KEY_FLUSH_INTERVAL, 
DEFAULT_FLUSH_INTERVAL);
             this.concurrentRequests = 
sinkContext.getInteger(KEY_CONCURRENT_REQUESTS, DEFAULT_CONCURRENT_REQUESTS);
             this.maxConnect = sinkContext.getInteger(KEY_MAX_CONNECT, 
DEFAULT_MAX_CONNECT);
+            this.keywordMaxLength = 
sinkContext.getInteger(KEY_KEYWORD_MAX_LENGTH, DEFAULT_KEYWORD_MAX_LENGTH);
             this.isUseIndexId = sinkContext.getBoolean(KEY_IS_USE_INDEX_ID, 
DEFAULT_IS_USE_INDEX_ID);
             // http host
             this.strHttpHosts = sinkContext.getString(KEY_HTTP_HOSTS);
@@ -303,18 +304,19 @@ public class EsSinkContext extends SinkContext {
      * @param  indexRequest
      * @return
      */
-    public boolean offerDispatchQueue(EsIndexRequest indexRequest) {
+    public void offerDispatchQueue(EsIndexRequest indexRequest) {
         this.offerCounter.incrementAndGet();
-        return dispatchQueue.offer(indexRequest);
+        dispatchQueue.acquire(indexRequest.getEvent().getBody().length);
+        dispatchQueue.offer(indexRequest);
     }
 
     /**
-     * taskDispatchQueue
+     * takeDispatchQueue
      * 
      * @return
      */
-    public EsIndexRequest taskDispatchQueue() {
-        EsIndexRequest indexRequest = this.dispatchQueue.poll();
+    public EsIndexRequest takeDispatchQueue() {
+        EsIndexRequest indexRequest = this.dispatchQueue.pollRecord();
         if (indexRequest != null) {
             this.takeCounter.incrementAndGet();
         }
@@ -327,9 +329,19 @@ public class EsSinkContext extends SinkContext {
      * @param  indexRequest
      * @return
      */
-    public boolean backDispatchQueue(EsIndexRequest indexRequest) {
+    public void backDispatchQueue(EsIndexRequest indexRequest) {
         this.backCounter.incrementAndGet();
-        return dispatchQueue.offer(indexRequest);
+        dispatchQueue.offer(indexRequest);
+    }
+
+    /**
+     * releaseDispatchQueue
+     * 
+     * @param  indexRequest
+     * @return
+     */
+    public void releaseDispatchQueue(EsIndexRequest indexRequest) {
+        dispatchQueue.release(indexRequest.getEvent().getBody().length);
     }
 
     /**
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
index aee6a11f7..34a6bae69 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
@@ -238,8 +238,9 @@ public final class SortSdkSource extends AbstractSource
      * @return Map
      */
     private Map<String, String> getSortClientConfigParameters() {
+        Map<String, String> sortSdkParams = new HashMap<>();
         Map<String, String> commonParams = 
CommonPropertiesHolder.getContext().getSubProperties(SORT_SDK_PREFIX);
-        Map<String, String> sortSdkParams = new HashMap<>(commonParams);
+        sortSdkParams.putAll(commonParams);
         SortTaskConfig taskConfig = 
SortClusterConfigHolder.getTaskConfig(taskName);
         if (taskConfig != null) {
             Map<String, String> sinkParams = taskConfig.getSinkParams();
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestDefaultEvent2IndexRequestHandler.java
 
b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestDefaultEvent2IndexRequestHandler.java
index f492e40e6..2410b4f10 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestDefaultEvent2IndexRequestHandler.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestDefaultEvent2IndexRequestHandler.java
@@ -17,18 +17,18 @@
 
 package org.apache.inlong.sort.standalone.sink.elasticsearch;
 
-import static org.junit.Assert.assertEquals;
-
-import java.util.concurrent.LinkedBlockingQueue;
-
 import org.apache.inlong.common.metric.MetricRegister;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.sink.SinkContext;
+import org.apache.inlong.sort.standalone.utils.BufferQueue;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import static org.junit.Assert.assertEquals;
+
 /**
  * 
  * TestDefaultEvent2IndexRequestHandler
@@ -45,7 +45,7 @@ public class TestDefaultEvent2IndexRequestHandler {
      */
     @Test
     public void test() throws Exception {
-        LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new 
LinkedBlockingQueue<>();
+        BufferQueue<EsIndexRequest> dispatchQueue = 
SinkContext.createBufferQueue();
         EsSinkContext context = TestEsSinkContext.mock(dispatchQueue);
         ProfileEvent event = TestEsSinkContext.mockProfileEvent();
         String uid = event.getUid();
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsCallbackListener.java
 
b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsCallbackListener.java
index 7c3e8393d..50f7b11ca 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsCallbackListener.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsCallbackListener.java
@@ -17,10 +17,10 @@
 
 package org.apache.inlong.sort.standalone.sink.elasticsearch;
 
-import java.util.concurrent.LinkedBlockingQueue;
-
 import org.apache.inlong.common.metric.MetricRegister;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.sink.SinkContext;
+import org.apache.inlong.sort.standalone.utils.BufferQueue;
 import org.elasticsearch.action.DocWriteRequest.OpType;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequest;
@@ -46,7 +46,7 @@ public class TestEsCallbackListener {
 
     @Before
     public void before() throws Exception {
-        LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new 
LinkedBlockingQueue<>();
+        BufferQueue<EsIndexRequest> dispatchQueue = 
SinkContext.createBufferQueue();
         this.context = TestEsSinkContext.mock(dispatchQueue);
     }
 
@@ -74,7 +74,7 @@ public class TestEsCallbackListener {
     @Test
     public void testAfterSuccessBulk() throws Exception {
         // prepare
-        LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new 
LinkedBlockingQueue<>();
+        BufferQueue<EsIndexRequest> dispatchQueue = 
SinkContext.createBufferQueue();
         EsSinkContext context = TestEsSinkContext.mock(dispatchQueue);
         ProfileEvent event = TestEsSinkContext.mockProfileEvent();
         EsIndexRequest indexRequest = 
context.createIndexRequestHandler().parse(context, event);
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsChannelWorker.java
 
b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsChannelWorker.java
index 5140337b0..eade97698 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsChannelWorker.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsChannelWorker.java
@@ -17,11 +17,11 @@
 
 package org.apache.inlong.sort.standalone.sink.elasticsearch;
 
-import java.util.concurrent.LinkedBlockingQueue;
-
 import org.apache.flume.Transaction;
 import org.apache.inlong.common.metric.MetricRegister;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.sink.SinkContext;
+import org.apache.inlong.sort.standalone.utils.BufferQueue;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -47,7 +47,7 @@ public class TestEsChannelWorker {
      */
     @Before
     public void before() throws Exception {
-        LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new 
LinkedBlockingQueue<>();
+        BufferQueue<EsIndexRequest> dispatchQueue = 
SinkContext.createBufferQueue();
         this.context = TestEsSinkContext.mock(dispatchQueue);
     }
 
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsOutputChannel.java
 
b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsOutputChannel.java
index 70b66905b..95b594def 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsOutputChannel.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsOutputChannel.java
@@ -17,12 +17,10 @@
 
 package org.apache.inlong.sort.standalone.sink.elasticsearch;
 
-import static org.mockito.ArgumentMatchers.any;
-
-import java.util.concurrent.LinkedBlockingQueue;
-
 import org.apache.inlong.common.metric.MetricRegister;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.sink.SinkContext;
+import org.apache.inlong.sort.standalone.utils.BufferQueue;
 import org.elasticsearch.action.ActionListener;
 import 
org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
 import 
org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
@@ -42,6 +40,8 @@ import 
org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import static org.mockito.ArgumentMatchers.any;
+
 /**
  * 
  * TestEsOutputChannel
@@ -109,12 +109,12 @@ public class TestEsOutputChannel {
      */
     @Test
     public void test() throws Exception {
-        LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new 
LinkedBlockingQueue<>();
+        BufferQueue<EsIndexRequest> dispatchQueue = 
SinkContext.createBufferQueue();
         EsSinkContext context = TestEsSinkContext.mock(dispatchQueue);
         EsOutputChannel output = new EsOutputChannel(context);
         ProfileEvent event = TestEsSinkContext.mockProfileEvent();
         EsIndexRequest indexRequest = 
context.createIndexRequestHandler().parse(context, event);
-        dispatchQueue.add(indexRequest);
+        dispatchQueue.offer(indexRequest);
         output.init();
         output.send();
         output.close();
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java
 
b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java
index 7e76dd9f0..8ae2df961 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java
@@ -17,20 +17,14 @@
 
 package org.apache.inlong.sort.standalone.sink.elasticsearch;
 
-import static org.junit.Assert.assertEquals;
-import static org.mockito.ArgumentMatchers.any;
-
-import java.nio.charset.Charset;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.inlong.common.metric.MetricRegister;
 import org.apache.inlong.sort.standalone.channel.BufferQueueChannel;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.sink.SinkContext;
+import org.apache.inlong.sort.standalone.utils.BufferQueue;
 import org.apache.inlong.sort.standalone.utils.Constants;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -39,6 +33,13 @@ import 
org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+
 /**
  * 
  * TestEsSinkContext
@@ -59,7 +60,7 @@ public class TestEsSinkContext {
      * @return EsSinkContext
      * @throws Exception exception
      */
-    public static EsSinkContext mock(LinkedBlockingQueue<EsIndexRequest> 
dispatchQueue) throws Exception {
+    public static EsSinkContext mock(BufferQueue<EsIndexRequest> 
dispatchQueue) throws Exception {
         PowerMockito.mockStatic(MetricRegister.class);
         PowerMockito.doNothing().when(MetricRegister.class, "register", any());
         Context context = CommonPropertiesHolder.getContext();
@@ -105,7 +106,7 @@ public class TestEsSinkContext {
      */
     @Test
     public void test() throws Exception {
-        LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new 
LinkedBlockingQueue<>();
+        BufferQueue<EsIndexRequest> dispatchQueue = 
SinkContext.createBufferQueue();
         EsSinkContext context = mock(dispatchQueue);
         assertEquals(10, context.getBulkSizeMb());
     }

Reply via email to