This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new a362b5f8c [INLONG-5628][Sort] Add buffer limit of sink dispatch queue
(#6380)
a362b5f8c is described below
commit a362b5f8cd79377e1e678a7e730dbb3518e0d70f
Author: 卢春亮 <[email protected]>
AuthorDate: Wed Nov 23 19:16:08 2022 +0800
[INLONG-5628][Sort] Add buffer limit of sink dispatch queue (#6380)
---
.../standalone/utils/FlumeConfigGenerator.java | 9 +++---
.../inlong/sort/standalone/sink/SinkContext.java | 24 +++++++++++----
.../sink/elasticsearch/EsCallbackListener.java | 1 +
.../sink/elasticsearch/EsOutputChannel.java | 2 +-
.../sort/standalone/sink/elasticsearch/EsSink.java | 12 ++++----
.../sink/elasticsearch/EsSinkContext.java | 34 +++++++++++++++-------
.../standalone/source/sortsdk/SortSdkSource.java | 3 +-
.../TestDefaultEvent2IndexRequestHandler.java | 10 +++----
.../sink/elasticsearch/TestEsCallbackListener.java | 8 ++---
.../sink/elasticsearch/TestEsChannelWorker.java | 6 ++--
.../sink/elasticsearch/TestEsOutputChannel.java | 12 ++++----
.../sink/elasticsearch/TestEsSinkContext.java | 21 ++++++-------
12 files changed, 87 insertions(+), 55 deletions(-)
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java
index ca3e1b102..76083ff08 100644
---
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java
@@ -34,6 +34,8 @@ public class FlumeConfigGenerator {
public static final String KEY_SORT_SINK_TYPE = "sortSink.type";
public static final String KEY_SORT_SOURCE_TYPE = "sortSource.type";
public static final String KEY_SORT_INTERCEPTOR_TYPE = "interceptor.type";
+ public static final String DEFAULT_SORT_INTERCEPTOR_TYPE
+ =
"org.apache.inlong.sort.standalone.rollback.TimeBasedFilterInterceptor$Builder";
public static final String KEY_ROLLBACK_START_TIME = "rollback.startTime";
public static final String KEY_ROLLBACK_STOP_TIME = "rollback.stopTime";
@@ -138,8 +140,7 @@ public class FlumeConfigGenerator {
*/
private static void appendSources(
Map<String, String> flumeConf,
- String name, Map<String,
- String> sinkParams) {
+ String name, Map<String, String> sinkParams) {
// sources
String sourceName = name + "Source";
flumeConf.put(name + ".sources", sourceName);
@@ -169,8 +170,8 @@ public class FlumeConfigGenerator {
builder.setLength(0);
String interceptorType =
builder.append(prefix).append("interceptors.").append(interceptorName)
.append(".type").toString();
-
Optional.ofNullable(CommonPropertiesHolder.getString(KEY_SORT_INTERCEPTOR_TYPE))
- .map(type -> flumeConf.put(interceptorType, type));
+ flumeConf.put(interceptorType,
+ CommonPropertiesHolder.getString(KEY_SORT_INTERCEPTOR_TYPE,
DEFAULT_SORT_INTERCEPTOR_TYPE));
builder.setLength(0);
String startTimeKey =
builder.append(prefix).append("interceptors.").append(interceptorName).append(".")
.append(KEY_ROLLBACK_START_TIME).toString();
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
index 3f1222eb4..a7f9530f6 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
@@ -17,11 +17,6 @@
package org.apache.inlong.sort.standalone.sink;
-import java.util.Date;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
@@ -32,9 +27,15 @@ import
org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
import org.apache.inlong.sort.standalone.metrics.SortMetricItemSet;
+import org.apache.inlong.sort.standalone.utils.BufferQueue;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.slf4j.Logger;
+import java.util.Date;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+
/**
*
* SinkContext
@@ -47,6 +48,8 @@ public class SinkContext {
public static final String KEY_PROCESSINTERVAL = "processInterval";
public static final String KEY_RELOADINTERVAL = "reloadInterval";
public static final String KEY_TASK_NAME = "taskName";
+ public static final String KEY_MAX_BUFFERQUEUE_SIZE_KB =
"maxBufferQueueSizeKb";
+ public static final int DEFAULT_MAX_BUFFERQUEUE_SIZE_KB = 128 * 1024;
protected final String clusterId;
protected final String taskName;
@@ -237,4 +240,15 @@ public class SinkContext {
dimensions.put(SortMetricItem.KEY_INLONG_GROUP_ID, inlongGroupId);
dimensions.put(SortMetricItem.KEY_INLONG_STREAM_ID, inlongStreamId);
}
+
+ /**
+ * createBufferQueue
+ * @return
+ */
+ public static <U> BufferQueue<U> createBufferQueue() {
+ int maxBufferQueueSizeKb =
CommonPropertiesHolder.getInteger(KEY_MAX_BUFFERQUEUE_SIZE_KB,
+ DEFAULT_MAX_BUFFERQUEUE_SIZE_KB);
+ BufferQueue<U> dispatchQueue = new BufferQueue<>(maxBufferQueueSizeKb);
+ return dispatchQueue;
+ }
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsCallbackListener.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsCallbackListener.java
index 2e5419492..c149c279b 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsCallbackListener.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsCallbackListener.java
@@ -87,6 +87,7 @@ public class EsCallbackListener implements
BulkProcessor.Listener {
context.backDispatchQueue(requestItem);
} else {
context.addSendResultMetric(event, context.getTaskName(),
true, sendTime);
+ context.releaseDispatchQueue(requestItem);
event.ack();
}
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsOutputChannel.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsOutputChannel.java
index 717d3853d..9b4208fb3 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsOutputChannel.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsOutputChannel.java
@@ -201,7 +201,7 @@ public class EsOutputChannel extends Thread {
return;
}
// get indexRequest
- indexRequest = context.taskDispatchQueue();
+ indexRequest = context.takeDispatchQueue();
if (indexRequest == null) {
Thread.sleep(context.getProcessInterval());
return;
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSink.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSink.java
index 35836d98d..97aa9314c 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSink.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSink.java
@@ -17,17 +17,18 @@
package org.apache.inlong.sort.standalone.sink.elasticsearch;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
-
import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
+import org.apache.inlong.sort.standalone.sink.SinkContext;
+import org.apache.inlong.sort.standalone.utils.BufferQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.List;
+
/**
* EsSink
*/
@@ -36,7 +37,7 @@ public class EsSink extends AbstractSink implements
Configurable {
public static final Logger LOG = LoggerFactory.getLogger(EsSink.class);
private Context parentContext;
- private final LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new
LinkedBlockingQueue<>();
+ private BufferQueue<EsIndexRequest> dispatchQueue;
private EsSinkContext context;
// workers
private List<EsChannelWorker> workers = new ArrayList<>();
@@ -50,6 +51,7 @@ public class EsSink extends AbstractSink implements
Configurable {
public void start() {
super.start();
try {
+ this.dispatchQueue = SinkContext.createBufferQueue();
this.context = new EsSinkContext(getName(), parentContext,
getChannel(), dispatchQueue);
this.context.start();
for (int i = 0; i < context.getMaxThreads(); i++) {
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
index 6f22a6876..611be5756 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
@@ -33,6 +33,7 @@ import org.apache.inlong.sort.standalone.config.pojo.InlongId;
import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
import org.apache.inlong.sort.standalone.sink.SinkContext;
+import org.apache.inlong.sort.standalone.utils.BufferQueue;
import org.apache.inlong.sort.standalone.utils.Constants;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.slf4j.Logger;
@@ -42,7 +43,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -70,13 +70,13 @@ public class EsSinkContext extends SinkContext {
public static final int DEFAULT_FLUSH_INTERVAL = 60;
public static final int DEFAULT_CONCURRENT_REQUESTS = 5;
public static final int DEFAULT_MAX_CONNECT = 10;
- public static final int DEFAULT_KEYWORD_MAX_LENGTH = 32 * 1024 - 1;
+ public static final int DEFAULT_KEYWORD_MAX_LENGTH = 31 * 1024;
public static final boolean DEFAULT_IS_USE_INDEX_ID = false;
private Context sinkContext;
private String nodeId;
private Map<String, EsIdConfig> idConfigMap = new ConcurrentHashMap<>();
- private final LinkedBlockingQueue<EsIndexRequest> dispatchQueue;
+ private final BufferQueue<EsIndexRequest> dispatchQueue;
private AtomicLong offerCounter = new AtomicLong(0);
private AtomicLong takeCounter = new AtomicLong(0);
private AtomicLong backCounter = new AtomicLong(0);
@@ -103,7 +103,7 @@ public class EsSinkContext extends SinkContext {
* @param dispatchQueue
*/
public EsSinkContext(String sinkName, Context context, Channel channel,
- LinkedBlockingQueue<EsIndexRequest> dispatchQueue) {
+ BufferQueue<EsIndexRequest> dispatchQueue) {
super(sinkName, context, channel);
this.sinkContext = context;
this.dispatchQueue = dispatchQueue;
@@ -149,6 +149,7 @@ public class EsSinkContext extends SinkContext {
this.flushInterval = sinkContext.getInteger(KEY_FLUSH_INTERVAL,
DEFAULT_FLUSH_INTERVAL);
this.concurrentRequests =
sinkContext.getInteger(KEY_CONCURRENT_REQUESTS, DEFAULT_CONCURRENT_REQUESTS);
this.maxConnect = sinkContext.getInteger(KEY_MAX_CONNECT,
DEFAULT_MAX_CONNECT);
+ this.keywordMaxLength =
sinkContext.getInteger(KEY_KEYWORD_MAX_LENGTH, DEFAULT_KEYWORD_MAX_LENGTH);
this.isUseIndexId = sinkContext.getBoolean(KEY_IS_USE_INDEX_ID,
DEFAULT_IS_USE_INDEX_ID);
// http host
this.strHttpHosts = sinkContext.getString(KEY_HTTP_HOSTS);
@@ -303,18 +304,19 @@ public class EsSinkContext extends SinkContext {
* @param indexRequest
* @return
*/
- public boolean offerDispatchQueue(EsIndexRequest indexRequest) {
+ public void offerDispatchQueue(EsIndexRequest indexRequest) {
this.offerCounter.incrementAndGet();
- return dispatchQueue.offer(indexRequest);
+ dispatchQueue.acquire(indexRequest.getEvent().getBody().length);
+ dispatchQueue.offer(indexRequest);
}
/**
- * taskDispatchQueue
+ * takeDispatchQueue
*
* @return
*/
- public EsIndexRequest taskDispatchQueue() {
- EsIndexRequest indexRequest = this.dispatchQueue.poll();
+ public EsIndexRequest takeDispatchQueue() {
+ EsIndexRequest indexRequest = this.dispatchQueue.pollRecord();
if (indexRequest != null) {
this.takeCounter.incrementAndGet();
}
@@ -327,9 +329,19 @@ public class EsSinkContext extends SinkContext {
* @param indexRequest
* @return
*/
- public boolean backDispatchQueue(EsIndexRequest indexRequest) {
+ public void backDispatchQueue(EsIndexRequest indexRequest) {
this.backCounter.incrementAndGet();
- return dispatchQueue.offer(indexRequest);
+ dispatchQueue.offer(indexRequest);
+ }
+
+ /**
+ * releaseDispatchQueue
+ *
+ * @param indexRequest
+ * @return
+ */
+ public void releaseDispatchQueue(EsIndexRequest indexRequest) {
+ dispatchQueue.release(indexRequest.getEvent().getBody().length);
}
/**
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
index aee6a11f7..34a6bae69 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
@@ -238,8 +238,9 @@ public final class SortSdkSource extends AbstractSource
* @return Map
*/
private Map<String, String> getSortClientConfigParameters() {
+ Map<String, String> sortSdkParams = new HashMap<>();
Map<String, String> commonParams =
CommonPropertiesHolder.getContext().getSubProperties(SORT_SDK_PREFIX);
- Map<String, String> sortSdkParams = new HashMap<>(commonParams);
+ sortSdkParams.putAll(commonParams);
SortTaskConfig taskConfig =
SortClusterConfigHolder.getTaskConfig(taskName);
if (taskConfig != null) {
Map<String, String> sinkParams = taskConfig.getSinkParams();
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestDefaultEvent2IndexRequestHandler.java
b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestDefaultEvent2IndexRequestHandler.java
index f492e40e6..2410b4f10 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestDefaultEvent2IndexRequestHandler.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestDefaultEvent2IndexRequestHandler.java
@@ -17,18 +17,18 @@
package org.apache.inlong.sort.standalone.sink.elasticsearch;
-import static org.junit.Assert.assertEquals;
-
-import java.util.concurrent.LinkedBlockingQueue;
-
import org.apache.inlong.common.metric.MetricRegister;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.sink.SinkContext;
+import org.apache.inlong.sort.standalone.utils.BufferQueue;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
+import static org.junit.Assert.assertEquals;
+
/**
*
* TestDefaultEvent2IndexRequestHandler
@@ -45,7 +45,7 @@ public class TestDefaultEvent2IndexRequestHandler {
*/
@Test
public void test() throws Exception {
- LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new
LinkedBlockingQueue<>();
+ BufferQueue<EsIndexRequest> dispatchQueue =
SinkContext.createBufferQueue();
EsSinkContext context = TestEsSinkContext.mock(dispatchQueue);
ProfileEvent event = TestEsSinkContext.mockProfileEvent();
String uid = event.getUid();
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsCallbackListener.java
b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsCallbackListener.java
index 7c3e8393d..50f7b11ca 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsCallbackListener.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsCallbackListener.java
@@ -17,10 +17,10 @@
package org.apache.inlong.sort.standalone.sink.elasticsearch;
-import java.util.concurrent.LinkedBlockingQueue;
-
import org.apache.inlong.common.metric.MetricRegister;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.sink.SinkContext;
+import org.apache.inlong.sort.standalone.utils.BufferQueue;
import org.elasticsearch.action.DocWriteRequest.OpType;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
@@ -46,7 +46,7 @@ public class TestEsCallbackListener {
@Before
public void before() throws Exception {
- LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new
LinkedBlockingQueue<>();
+ BufferQueue<EsIndexRequest> dispatchQueue =
SinkContext.createBufferQueue();
this.context = TestEsSinkContext.mock(dispatchQueue);
}
@@ -74,7 +74,7 @@ public class TestEsCallbackListener {
@Test
public void testAfterSuccessBulk() throws Exception {
// prepare
- LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new
LinkedBlockingQueue<>();
+ BufferQueue<EsIndexRequest> dispatchQueue =
SinkContext.createBufferQueue();
EsSinkContext context = TestEsSinkContext.mock(dispatchQueue);
ProfileEvent event = TestEsSinkContext.mockProfileEvent();
EsIndexRequest indexRequest =
context.createIndexRequestHandler().parse(context, event);
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsChannelWorker.java
b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsChannelWorker.java
index 5140337b0..eade97698 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsChannelWorker.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsChannelWorker.java
@@ -17,11 +17,11 @@
package org.apache.inlong.sort.standalone.sink.elasticsearch;
-import java.util.concurrent.LinkedBlockingQueue;
-
import org.apache.flume.Transaction;
import org.apache.inlong.common.metric.MetricRegister;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.sink.SinkContext;
+import org.apache.inlong.sort.standalone.utils.BufferQueue;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -47,7 +47,7 @@ public class TestEsChannelWorker {
*/
@Before
public void before() throws Exception {
- LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new
LinkedBlockingQueue<>();
+ BufferQueue<EsIndexRequest> dispatchQueue =
SinkContext.createBufferQueue();
this.context = TestEsSinkContext.mock(dispatchQueue);
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsOutputChannel.java
b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsOutputChannel.java
index 70b66905b..95b594def 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsOutputChannel.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsOutputChannel.java
@@ -17,12 +17,10 @@
package org.apache.inlong.sort.standalone.sink.elasticsearch;
-import static org.mockito.ArgumentMatchers.any;
-
-import java.util.concurrent.LinkedBlockingQueue;
-
import org.apache.inlong.common.metric.MetricRegister;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.sink.SinkContext;
+import org.apache.inlong.sort.standalone.utils.BufferQueue;
import org.elasticsearch.action.ActionListener;
import
org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import
org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
@@ -42,6 +40,8 @@ import
org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
+import static org.mockito.ArgumentMatchers.any;
+
/**
*
* TestEsOutputChannel
@@ -109,12 +109,12 @@ public class TestEsOutputChannel {
*/
@Test
public void test() throws Exception {
- LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new
LinkedBlockingQueue<>();
+ BufferQueue<EsIndexRequest> dispatchQueue =
SinkContext.createBufferQueue();
EsSinkContext context = TestEsSinkContext.mock(dispatchQueue);
EsOutputChannel output = new EsOutputChannel(context);
ProfileEvent event = TestEsSinkContext.mockProfileEvent();
EsIndexRequest indexRequest =
context.createIndexRequestHandler().parse(context, event);
- dispatchQueue.add(indexRequest);
+ dispatchQueue.offer(indexRequest);
output.init();
output.send();
output.close();
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java
b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java
index 7e76dd9f0..8ae2df961 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java
@@ -17,20 +17,14 @@
package org.apache.inlong.sort.standalone.sink.elasticsearch;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.ArgumentMatchers.any;
-
-import java.nio.charset.Charset;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.inlong.common.metric.MetricRegister;
import org.apache.inlong.sort.standalone.channel.BufferQueueChannel;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.sink.SinkContext;
+import org.apache.inlong.sort.standalone.utils.BufferQueue;
import org.apache.inlong.sort.standalone.utils.Constants;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -39,6 +33,13 @@ import
org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+
/**
*
* TestEsSinkContext
@@ -59,7 +60,7 @@ public class TestEsSinkContext {
* @return EsSinkContext
* @throws Exception exception
*/
- public static EsSinkContext mock(LinkedBlockingQueue<EsIndexRequest>
dispatchQueue) throws Exception {
+ public static EsSinkContext mock(BufferQueue<EsIndexRequest>
dispatchQueue) throws Exception {
PowerMockito.mockStatic(MetricRegister.class);
PowerMockito.doNothing().when(MetricRegister.class, "register", any());
Context context = CommonPropertiesHolder.getContext();
@@ -105,7 +106,7 @@ public class TestEsSinkContext {
*/
@Test
public void test() throws Exception {
- LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new
LinkedBlockingQueue<>();
+ BufferQueue<EsIndexRequest> dispatchQueue =
SinkContext.createBufferQueue();
EsSinkContext context = mock(dispatchQueue);
assertEquals(10, context.getBulkSizeMb());
}