This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new c3bdf56a92 [INLONG-9308][Agent] The sink end of the file instance
supports sending data with different streamIds (#9309)
c3bdf56a92 is described below
commit c3bdf56a92d57a992e4639dbeb1f19e32c2d14d4
Author: justinwwhuang <[email protected]>
AuthorDate: Mon Nov 20 15:50:05 2023 +0800
[INLONG-9308][Agent] The sink end of the file instance supports sending
data with different streamIds (#9309)
---
.../org/apache/inlong/agent/conf/TaskProfile.java | 10 +-
.../{PackageAckInfo.java => OffsetAckInfo.java} | 7 +-
.../agent/message/filecollect/ProxyMessage.java | 100 +++++++++++++
.../message/filecollect/ProxyMessageCache.java | 72 +++++-----
.../agent/message/filecollect/SenderMessage.java | 2 +-
.../apache/inlong/agent/utils/DateTransUtils.java | 10 +-
.../agent/plugin/sinks/filecollect/ProxySink.java | 155 ++++++++++++++-------
.../plugin/sinks/filecollect/SenderManager.java | 95 +------------
.../inlong/agent/plugin/sources/LogFileSource.java | 8 +-
.../agent/plugin/task/filecollect/WatchEntity.java | 2 +-
.../agent/plugin/utils/file/NewDateUtils.java | 6 +-
.../sinks/filecollect/TestSenderManager.java | 42 +++---
12 files changed, 295 insertions(+), 214 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
index be9b8cd1f3..cbad21e499 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
@@ -110,7 +110,8 @@ public class TaskProfile extends AbstractConfiguration {
return hasKey(TaskConstants.TASK_ID) &&
hasKey(TaskConstants.TASK_SOURCE)
&& hasKey(TaskConstants.TASK_SINK) &&
hasKey(TaskConstants.TASK_CHANNEL)
&& hasKey(TaskConstants.TASK_GROUP_ID) &&
hasKey(TaskConstants.TASK_STREAM_ID)
- && hasKey(TaskConstants.TASK_CYCLE_UNIT);
+ && hasKey(TaskConstants.TASK_CYCLE_UNIT)
+ && hasKey(TaskConstants.TASK_FILE_TIME_ZONE);
}
public String toJsonStr() {
@@ -125,10 +126,13 @@ public class TaskProfile extends AbstractConfiguration {
instanceProfile.setSourceDataTime(dataTime);
Long sinkDataTime = 0L;
try {
- sinkDataTime = DateTransUtils.timeStrConvertTomillSec(dataTime,
getCycleUnit(),
+ sinkDataTime = DateTransUtils.timeStrConvertToMillSec(dataTime,
getCycleUnit(),
TimeZone.getTimeZone(getTimeZone()));
} catch (ParseException e) {
- logger.error("createInstanceProfile error: ", e);
+ logger.error("createInstanceProfile ParseException error: ", e);
+ return null;
+ } catch (Exception e) {
+ logger.error("createInstanceProfile Exception error: ", e);
return null;
}
instanceProfile.setSinkDataTime(sinkDataTime);
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/PackageAckInfo.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/OffsetAckInfo.java
similarity index 88%
rename from
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/PackageAckInfo.java
rename to
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/OffsetAckInfo.java
index 6efdbbdc1e..f6637955bc 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/PackageAckInfo.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/OffsetAckInfo.java
@@ -19,15 +19,12 @@ package org.apache.inlong.agent.message.filecollect;
import lombok.AllArgsConstructor;
import lombok.Data;
-import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
-@NoArgsConstructor
-public class PackageAckInfo {
+public class OffsetAckInfo {
- private Long index;
private Long offset;
- private Integer len;
+ private int len;
private Boolean hasAck;
}
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessage.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessage.java
new file mode 100644
index 0000000000..7d9f4930ac
--- /dev/null
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessage.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.message.filecollect;
+
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.plugin.Message;
+
+import java.util.Map;
+
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
+import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_GROUP_ID;
+import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_STREAM_ID;
+
+/**
+ * Bus message with body, header, inlongGroupId and inlongStreamId.
+ */
+public class ProxyMessage implements Message {
+
+ private static final String DEFAULT_INLONG_STREAM_ID = "__";
+
+ private final byte[] body;
+ private final Map<String, String> header;
+ private final String inlongGroupId;
+ private final String inlongStreamId;
+ // determine the group key when making batch
+ private final String batchKey;
+ private final String dataKey;
+ OffsetAckInfo ackInfo;
+
+ public ProxyMessage(byte[] body, Map<String, String> header) {
+ this.body = body;
+ this.header = header;
+ this.inlongGroupId = header.get(PROXY_KEY_GROUP_ID);
+ this.inlongStreamId = header.getOrDefault(PROXY_KEY_STREAM_ID,
DEFAULT_INLONG_STREAM_ID);
+ this.dataKey = header.getOrDefault(PROXY_KEY_DATA, "");
+ // use the batch key of user and inlongStreamId to determine one batch
+ this.batchKey = dataKey + inlongStreamId;
+ Long offset = Long.parseLong(header.get(TaskConstants.OFFSET));
+ ackInfo = new OffsetAckInfo(offset, body.length, false);
+ }
+
+ public ProxyMessage(Message message) {
+ this(message.getBody(), message.getHeader());
+ }
+
+ public String getDataKey() {
+ return dataKey;
+ }
+
+ /**
+ * Get first line of body list
+ *
+ * @return first line of body list
+ */
+ @Override
+ public byte[] getBody() {
+ return body;
+ }
+
+ public OffsetAckInfo getAckInfo() {
+ return ackInfo;
+ }
+
+ /**
+ * Get header of message
+ *
+ * @return header
+ */
+ @Override
+ public Map<String, String> getHeader() {
+ return header;
+ }
+
+ public String getInlongGroupId() {
+ return inlongGroupId;
+ }
+
+ public String getInlongStreamId() {
+ return inlongStreamId;
+ }
+
+ public String getBatchKey() {
+ return batchKey;
+ }
+}
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
index 7e2d3c8b8a..9c0c84e0ef 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
@@ -18,8 +18,6 @@
package org.apache.inlong.agent.message.filecollect;
import org.apache.inlong.agent.conf.InstanceProfile;
-import org.apache.inlong.agent.constant.TaskConstants;
-import org.apache.inlong.agent.message.ProxyMessage;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.DateTransUtils;
import org.apache.inlong.common.msg.AttributeConstants;
@@ -32,6 +30,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -51,19 +50,17 @@ public class ProxyMessageCache {
private static final Logger LOGGER =
LoggerFactory.getLogger(ProxyMessageCache.class);
- private final String groupId;
- private final String streamId;
private final String taskId;
private final String instanceId;
private final int maxPackSize;
private final int maxQueueNumber;
- private final String inodeInfo;
+ private final String groupId;
// ms
private final int cacheTimeout;
// streamId -> list of proxyMessage
- private final LinkedBlockingQueue<ProxyMessage> messageQueue;
+ private final ConcurrentHashMap<String, LinkedBlockingQueue<ProxyMessage>>
messageQueueMap;
+ // private final LinkedBlockingQueue<ProxyMessage> messageQueue;
private final AtomicLong cacheSize = new AtomicLong(0);
- private Long packageIndex = 0L;
private long lastPrintTime = 0;
private long dataTime;
/**
@@ -74,16 +71,15 @@ public class ProxyMessageCache {
public ProxyMessageCache(InstanceProfile instanceProfile, String groupId,
String streamId) {
this.taskId = instanceProfile.getTaskId();
this.instanceId = instanceProfile.getInstanceId();
+ this.groupId = groupId;
this.maxPackSize = instanceProfile.getInt(PROXY_PACKAGE_MAX_SIZE,
DEFAULT_PROXY_PACKAGE_MAX_SIZE);
this.maxQueueNumber =
instanceProfile.getInt(PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER,
DEFAULT_PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER);
this.cacheTimeout =
instanceProfile.getInt(PROXY_PACKAGE_MAX_TIMEOUT_MS,
DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS);
- this.messageQueue = new LinkedBlockingQueue<>(maxQueueNumber);
- this.groupId = groupId;
- this.streamId = streamId;
- this.inodeInfo = instanceProfile.get(TaskConstants.INODE_INFO);
+ messageQueueMap = new ConcurrentHashMap<>();
+ // this.messageQueue = new LinkedBlockingQueue<>(maxQueueNumber);
try {
- dataTime =
DateTransUtils.timeStrConvertTomillSec(instanceProfile.getSourceDataTime(),
+ dataTime =
DateTransUtils.timeStrConvertToMillSec(instanceProfile.getSourceDataTime(),
instanceProfile.get(TASK_CYCLE_UNIT));
} catch (ParseException e) {
LOGGER.info("trans dataTime error", e);
@@ -101,22 +97,19 @@ public class ProxyMessageCache {
*
* @return true if is nearly full else false.
*/
- private boolean queueIsFull() {
+ private boolean queueIsFull(LinkedBlockingQueue<ProxyMessage>
messageQueue) {
return messageQueue.size() >= maxQueueNumber - 1;
}
/**
* Add proxy message to cache, proxy message should belong to the same
stream id.
*/
- public boolean addProxyMessage(ProxyMessage message) {
- assert streamId.equals(message.getInlongStreamId());
+ public boolean add(ProxyMessage message) {
+ String streamId = message.getInlongStreamId();
+ LinkedBlockingQueue<ProxyMessage> messageQueue =
makeSureQueueExist(streamId);
try {
- if (queueIsFull()) {
- if (AgentUtils.getCurrentTime() - lastPrintTime >
TimeUnit.SECONDS.toMillis(1)) {
- lastPrintTime = AgentUtils.getCurrentTime();
- LOGGER.warn("message queue is greater than {}, stop adding
message, "
- + "maybe proxy get stuck", maxQueueNumber);
- }
+ if (queueIsFull(messageQueue)) {
+ printQueueFull();
return false;
}
messageQueue.put(message);
@@ -128,11 +121,25 @@ public class ProxyMessageCache {
return false;
}
- /**
- * check message queue is empty or not
- */
- public boolean isEmpty() {
- return messageQueue.isEmpty();
+ private void printQueueFull() {
+ if (AgentUtils.getCurrentTime() - lastPrintTime >
TimeUnit.SECONDS.toMillis(1)) {
+ lastPrintTime = AgentUtils.getCurrentTime();
+ LOGGER.warn("message queue is greater than {}, stop adding
message, "
+ + "maybe proxy get stuck", maxQueueNumber);
+ }
+ }
+
+ public ConcurrentHashMap<String, LinkedBlockingQueue<ProxyMessage>>
getMessageQueueMap() {
+ return messageQueueMap;
+ }
+
+ private LinkedBlockingQueue<ProxyMessage> makeSureQueueExist(String
streamId) {
+ LinkedBlockingQueue<ProxyMessage> messageQueue =
messageQueueMap.get(streamId);
+ if (messageQueue == null) {
+ messageQueue = new LinkedBlockingQueue<>();
+ messageQueueMap.put(streamId, messageQueue);
+ }
+ return messageQueue;
}
/**
@@ -140,10 +147,10 @@ public class ProxyMessageCache {
*
* @return map of message list, key is stream id for the batch; return
null if there are no valid messages.
*/
- public SenderMessage fetchSenderMessage() {
+ public SenderMessage fetchSenderMessage(String streamId,
LinkedBlockingQueue<ProxyMessage> messageQueue) {
int resultBatchSize = 0;
List<byte[]> bodyList = new ArrayList<>();
- Long packageOffset = TaskConstants.DEFAULT_OFFSET;
+ List<OffsetAckInfo> offsetList = new ArrayList<>();
while (!messageQueue.isEmpty()) {
// pre check message size
ProxyMessage peekMessage = messageQueue.peek();
@@ -164,17 +171,12 @@ public class ProxyMessageCache {
// decrease queue size.
cacheSize.addAndGet(-bodySize);
bodyList.add(message.getBody());
- Long newOffset =
Long.parseLong(message.getHeader().get(TaskConstants.OFFSET));
- if (packageOffset < newOffset) {
- packageOffset = newOffset;
- }
+ offsetList.add(message.getAckInfo());
}
// make sure result is not empty.
if (!bodyList.isEmpty()) {
- PackageAckInfo ackInfo = new PackageAckInfo(packageIndex,
packageOffset, resultBatchSize, false);
SenderMessage senderMessage = new SenderMessage(taskId,
instanceId, groupId, streamId, bodyList,
- dataTime, extraMap, ackInfo);
- packageIndex++;
+ dataTime, extraMap, offsetList);
return senderMessage;
}
return null;
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/SenderMessage.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/SenderMessage.java
index a49005e8b3..da3579ca82 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/SenderMessage.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/SenderMessage.java
@@ -43,7 +43,7 @@ public class SenderMessage {
private List<byte[]> dataList;
private long dataTime;
private Map<String, String> extraMap;
- private PackageAckInfo ackInfo;
+ private List<OffsetAckInfo> offsetAckList;
public InLongMsg getInLongMsg() {
InLongMsg message = InLongMsg.newInLongMsg(true);
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java
index 6a79222a2a..2aa08742e8 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java
@@ -36,12 +36,12 @@ public class DateTransUtils {
}
// convert YYYMMDD to millSec by cycleUnit
- public static long timeStrConvertTomillSec(String time, String cycleUnit)
+ public static long timeStrConvertToMillSec(String time, String cycleUnit)
throws ParseException {
- return timeStrConvertTomillSec(time, cycleUnit, TimeZone.getDefault());
+ return timeStrConvertToMillSec(time, cycleUnit, TimeZone.getDefault());
}
- public static long timeStrConvertTomillSec(String time, String cycleUnit,
TimeZone timeZone)
+ public static long timeStrConvertToMillSec(String time, String cycleUnit,
TimeZone timeZone)
throws ParseException {
long retTime = 0;
SimpleDateFormat df = null;
@@ -62,9 +62,6 @@ public class DateTransUtils {
try {
df.setTimeZone(timeZone);
retTime = df.parse(time).getTime();
- if (cycleUnit.equals("10m")) {
-
- }
} catch (ParseException e) {
logger.error("convert time string error. ", e);
}
@@ -98,7 +95,6 @@ public class DateTransUtils {
retTime = df.format(dateTime);
if (cycleUnit.contains("m")) {
-
int cycleNum = Integer.parseInt(cycleUnit.substring(0,
cycleUnit.length() - 1));
int mmTime = Integer.parseInt(retTime.substring(
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
index 922400025e..ad4f07258a 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
@@ -19,10 +19,13 @@ package org.apache.inlong.agent.plugin.sinks.filecollect;
import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.conf.OffsetProfile;
import org.apache.inlong.agent.constant.CommonConstants;
+import org.apache.inlong.agent.core.task.OffsetManager;
import org.apache.inlong.agent.core.task.file.MemoryManager;
import org.apache.inlong.agent.message.EndMessage;
-import org.apache.inlong.agent.message.ProxyMessage;
+import org.apache.inlong.agent.message.filecollect.OffsetAckInfo;
+import org.apache.inlong.agent.message.filecollect.ProxyMessage;
import org.apache.inlong.agent.message.filecollect.SenderMessage;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.MessageFilter;
@@ -33,12 +36,19 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_FIELD_SPLITTER;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT;
+import static org.apache.inlong.agent.constant.TaskConstants.INODE_INFO;
/**
* sink message data to inlong-dataproxy
@@ -48,8 +58,7 @@ public class ProxySink extends AbstractSink {
private static final Logger LOGGER =
LoggerFactory.getLogger(ProxySink.class);
private final int WRITE_FAILED_WAIT_TIME_MS = 10;
private final int DESTROY_LOOP_WAIT_TIME_MS = 10;
- private final Integer NO_WRITE_WAIT_AT_LEAST_MS = 5 * 1000;
- private final Integer SINK_FINISH_AT_LEAST_COUNT = 5;
+ public final int SAVE_OFFSET_INTERVAL_MS = 1000;
private static final ThreadPoolExecutor EXECUTOR_SERVICE = new
ThreadPoolExecutor(
0, Integer.MAX_VALUE,
1L, TimeUnit.SECONDS,
@@ -61,8 +70,11 @@ public class ProxySink extends AbstractSink {
private volatile boolean shutdown = false;
private volatile boolean running = false;
private volatile boolean inited = false;
- private volatile long lastWriteTime = 0;
- private volatile long checkSinkFinishCount = 0;
+ private long lastPrintTime = 0;
+ private List<OffsetAckInfo> ackInfoList = new ArrayList<>();
+ private final ReentrantReadWriteLock packageAckInfoLock = new
ReentrantReadWriteLock(true);
+ private volatile boolean offsetRunning = false;
+ private OffsetManager offsetManager;
public ProxySink() {
}
@@ -71,7 +83,6 @@ public class ProxySink extends AbstractSink {
public void write(Message message) {
boolean suc = false;
while (!shutdown && !suc) {
- lastWriteTime = AgentUtils.getCurrentTime();
suc = putInCache(message);
if (!suc) {
AgentUtils.silenceSleepInMs(WRITE_FAILED_WAIT_TIME_MS);
@@ -84,8 +95,6 @@ public class ProxySink extends AbstractSink {
if (message == null) {
return true;
}
- message.getHeader().put(CommonConstants.PROXY_KEY_GROUP_ID,
inlongGroupId);
- message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID,
inlongStreamId);
extractStreamFromMessage(message, fieldSplitter);
if (message instanceof EndMessage) {
// increment the count of failed sinks
@@ -101,8 +110,10 @@ public class ProxySink extends AbstractSink {
}
cache.generateExtraMap(proxyMessage.getDataKey());
// add message to package proxy
- boolean suc = cache.addProxyMessage(proxyMessage);
- if (!suc) {
+ boolean suc = cache.add(proxyMessage);
+ if (suc) {
+ addAckInfo(proxyMessage.getAckInfo());
+ } else {
MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT,
message.getBody().length);
// increment the count of failed sinks
sinkMetric.sinkFailCount.incrementAndGet();
@@ -123,8 +134,6 @@ public class ProxySink extends AbstractSink {
if (messageFilter != null) {
message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID,
messageFilter.filterStreamId(message, fieldSplitter));
- } else {
- message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID,
inlongStreamId);
}
}
@@ -139,50 +148,46 @@ public class ProxySink extends AbstractSink {
"flushCache-" + profile.getTaskId() + "-" +
profile.getInstanceId());
LOGGER.info("start flush cache {}:{}", inlongGroupId, sourceName);
running = true;
- long lastPrintTime = AgentUtils.getCurrentTime();
while (!shutdown) {
- try {
- SenderMessage senderMessage = cache.fetchSenderMessage();
- if (senderMessage != null) {
- checkSinkFinishCount = 0;
- senderManager.sendBatch(senderMessage);
- if (AgentUtils.getCurrentTime() - lastPrintTime >
TimeUnit.SECONDS.toMillis(1)) {
- lastPrintTime = AgentUtils.getCurrentTime();
- LOGGER.info("send groupId {}, streamId {}, message
size {}, taskId {}, "
- + "instanceId {} sendTime is {}",
inlongGroupId, inlongStreamId,
- senderMessage.getDataList().size(),
profile.getTaskId(),
- profile.getInstanceId(),
- senderMessage.getDataTime());
- }
- }
- if (noWriteLongEnough() && senderManager.sendFinished()) {
- checkSinkFinishCount++;
- } else {
- checkSinkFinishCount = 0;
- }
- } catch (Exception ex) {
- LOGGER.error("error caught", ex);
- } catch (Throwable t) {
- ThreadUtils.threadThrowableHandler(Thread.currentThread(),
t);
- } finally {
- AgentUtils.silenceSleepInMs(batchFlushInterval);
- }
+ sendMessageFromCache();
+ AgentUtils.silenceSleepInMs(batchFlushInterval);
}
LOGGER.info("stop flush cache {}:{}", inlongGroupId, sourceName);
running = false;
};
}
+ public void sendMessageFromCache() {
+ ConcurrentHashMap<String, LinkedBlockingQueue<ProxyMessage>>
messageQueueMap = cache.getMessageQueueMap();
+ for (Map.Entry<String, LinkedBlockingQueue<ProxyMessage>> entry :
messageQueueMap.entrySet()) {
+ SenderMessage senderMessage =
cache.fetchSenderMessage(entry.getKey(), entry.getValue());
+ if (senderMessage == null) {
+ continue;
+ }
+ senderManager.sendBatch(senderMessage);
+ if (AgentUtils.getCurrentTime() - lastPrintTime >
TimeUnit.SECONDS.toMillis(1)) {
+ lastPrintTime = AgentUtils.getCurrentTime();
+ LOGGER.info("send groupId {}, streamId {}, message size {},
taskId {}, "
+ + "instanceId {} sendTime is {}", inlongGroupId,
inlongStreamId,
+ senderMessage.getDataList().size(),
profile.getTaskId(),
+ profile.getInstanceId(),
+ senderMessage.getDataTime());
+ }
+ }
+ }
+
@Override
public void init(InstanceProfile profile) {
super.init(profile);
fieldSplitter = profile.get(CommonConstants.FIELD_SPLITTER,
DEFAULT_FIELD_SPLITTER).getBytes(
StandardCharsets.UTF_8);
sourceName = profile.getInstanceId();
+ offsetManager = OffsetManager.init();
senderManager = new SenderManager(profile, inlongGroupId, sourceName);
try {
senderManager.Start();
EXECUTOR_SERVICE.execute(coreThread());
+ EXECUTOR_SERVICE.execute(flushOffset());
inited = true;
} catch (Throwable ex) {
shutdown = true;
@@ -199,11 +204,11 @@ public class ProxySink extends AbstractSink {
return;
}
shutdown = true;
- while (running) {
+ while (running || offsetRunning) {
AgentUtils.silenceSleepInMs(DESTROY_LOOP_WAIT_TIME_MS);
}
- MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, (int)
cache.getCacheSize());
senderManager.Stop();
+ clearOffset();
LOGGER.info("destroy sink {} end", sourceName);
}
@@ -212,18 +217,70 @@ public class ProxySink extends AbstractSink {
*/
@Override
public boolean sinkFinish() {
- if (noWriteLongEnough() && sinkFinishLongEnough()) {
- return true;
- } else {
- return false;
+ boolean finished = false;
+ packageAckInfoLock.writeLock().lock();
+ if (ackInfoList.isEmpty()) {
+ finished = true;
}
+ packageAckInfoLock.writeLock().unlock();
+ return finished;
}
- public boolean noWriteLongEnough() {
- return AgentUtils.getCurrentTime() - lastWriteTime >
NO_WRITE_WAIT_AT_LEAST_MS;
+ private void addAckInfo(OffsetAckInfo info) {
+ packageAckInfoLock.writeLock().lock();
+ ackInfoList.add(info);
+ packageAckInfoLock.writeLock().unlock();
}
- public boolean sinkFinishLongEnough() {
- return checkSinkFinishCount > SINK_FINISH_AT_LEAST_COUNT;
+ /**
+ * flushOffset
+ *
+ * @return thread runner
+ */
+ private Runnable flushOffset() {
+ return () -> {
+ AgentThreadFactory.nameThread(
+ "flushOffset-" + profile.getTaskId() + "-" +
profile.getInstanceId());
+ LOGGER.info("start flush offset {}:{}", inlongGroupId, sourceName);
+ offsetRunning = true;
+ while (!shutdown) {
+ doFlushOffset();
+ AgentUtils.silenceSleepInMs(SAVE_OFFSET_INTERVAL_MS);
+ }
+ LOGGER.info("stop flush offset {}:{}", inlongGroupId, sourceName);
+ offsetRunning = false;
+ };
+ }
+
+ /**
+ * flushOffset
+ */
+ private void doFlushOffset() {
+ packageAckInfoLock.writeLock().lock();
+ OffsetAckInfo info = null;
+ for (int i = 0; i < ackInfoList.size();) {
+ if (ackInfoList.get(i).getHasAck()) {
+ info = ackInfoList.remove(i);
+
MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, info.getLen());
+ } else {
+ break;
+ }
+ }
+ if (info != null) {
+ LOGGER.info("save offset {} taskId {} instanceId {}",
info.getOffset(), profile.getTaskId(),
+ profile.getInstanceId());
+ OffsetProfile offsetProfile = new
OffsetProfile(profile.getTaskId(), profile.getInstanceId(),
+ info.getOffset(), profile.get(INODE_INFO));
+ offsetManager.setOffset(offsetProfile);
+ }
+ packageAckInfoLock.writeLock().unlock();
+ }
+
+ private void clearOffset() {
+ packageAckInfoLock.writeLock().lock();
+ for (int i = 0; i < ackInfoList.size();) {
+ MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT,
ackInfoList.remove(i).getLen());
+ }
+ packageAckInfoLock.writeLock().unlock();
}
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
index bea53a1e9d..45fe9bc63e 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
@@ -20,11 +20,7 @@ package org.apache.inlong.agent.plugin.sinks.filecollect;
import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.InstanceProfile;
-import org.apache.inlong.agent.conf.OffsetProfile;
import org.apache.inlong.agent.constant.CommonConstants;
-import org.apache.inlong.agent.core.task.OffsetManager;
-import org.apache.inlong.agent.core.task.file.MemoryManager;
-import org.apache.inlong.agent.message.filecollect.PackageAckInfo;
import org.apache.inlong.agent.message.filecollect.SenderMessage;
import org.apache.inlong.agent.metrics.AgentMetricItem;
import org.apache.inlong.agent.metrics.AgentMetricItemSet;
@@ -44,7 +40,6 @@ import io.netty.util.concurrent.DefaultThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -53,19 +48,15 @@ import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_BATCH_FLUSH_INTERVAL;
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_BATCH_FLUSH_INTERVAL;
-import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_ID;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_KEY;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT;
import static
org.apache.inlong.agent.constant.TaskConstants.DEFAULT_JOB_PROXY_SEND;
-import static org.apache.inlong.agent.constant.TaskConstants.INODE_INFO;
import static org.apache.inlong.agent.constant.TaskConstants.JOB_PROXY_SEND;
import static
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID;
import static
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID;
@@ -78,8 +69,6 @@ public class SenderManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(SenderManager.class);
private static final SequentialID SEQUENTIAL_ID =
SequentialID.getInstance();
- public final int SAVE_OFFSET_INTERVAL_MS = 1000;
- private final AtomicInteger SENDER_INDEX = new AtomicInteger(0);
// cache for group and sender list, share the map cross agent lifecycle.
private DefaultMessageSender sender;
private LinkedBlockingQueue<AgentSenderCallback> resendQueue;
@@ -113,16 +102,12 @@ public class SenderManager {
// metric
private AgentMetricItemSet metricItemSet;
private Map<String, String> dimensions;
- private OffsetManager offsetManager;
private int ioThreadNum;
private boolean enableBusyWait;
private String authSecretId;
private String authSecretKey;
protected int batchFlushInterval;
- private List<PackageAckInfo> packageAckInfoList = new ArrayList<>();
- private final ReentrantReadWriteLock packageAckInfoLock = new
ReentrantReadWriteLock(true);
protected InstanceProfile profile;
- private volatile boolean offsetRunning = false;
private volatile boolean resendRunning = false;
private volatile boolean started = false;
@@ -155,7 +140,6 @@ public class SenderManager {
retrySleepTime = profile.getLong(
CommonConstants.PROXY_RETRY_SLEEP,
CommonConstants.DEFAULT_PROXY_RETRY_SLEEP);
isFile = profile.getBoolean(CommonConstants.PROXY_IS_FILE,
CommonConstants.DEFAULT_IS_FILE);
- offsetManager = OffsetManager.init();
ioThreadNum =
profile.getInt(CommonConstants.PROXY_CLIENT_IO_THREAD_NUM,
CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM);
enableBusyWait =
profile.getBoolean(CommonConstants.PROXY_CLIENT_ENABLE_BUSY_WAIT,
@@ -178,7 +162,6 @@ public class SenderManager {
public void Start() throws Exception {
createMessageSender(inlongGroupId);
- EXECUTOR_SERVICE.execute(flushOffset());
EXECUTOR_SERVICE.execute(flushResendQueue());
started = true;
}
@@ -189,11 +172,10 @@ public class SenderManager {
if (!started) {
return;
}
- while (offsetRunning || resendRunning) {
+ while (resendRunning) {
AgentUtils.silenceSleepInMs(1);
}
closeMessageSender();
- clearOffset();
LOGGER.info("stop send manager end");
}
@@ -247,60 +229,11 @@ public class SenderManager {
while (!shutdown && !resendQueue.isEmpty()) {
AgentUtils.silenceSleepInMs(retrySleepTime);
}
- addAckInfo(message.getAckInfo());
if (!shutdown) {
sendBatchWithRetryCount(message, 0);
}
}
- private void addAckInfo(PackageAckInfo info) {
- packageAckInfoLock.writeLock().lock();
- packageAckInfoList.add(info);
- packageAckInfoLock.writeLock().unlock();
- }
-
- public boolean sendFinished() {
- boolean finished = false;
- packageAckInfoLock.writeLock().lock();
- if (packageAckInfoList.isEmpty()) {
- finished = true;
- }
- packageAckInfoLock.writeLock().unlock();
- return finished;
- }
-
- /**
- * flushOffset
- */
- private void doFlushOffset() {
- packageAckInfoLock.writeLock().lock();
- PackageAckInfo info = null;
- for (int i = 0; i < packageAckInfoList.size();) {
- if (packageAckInfoList.get(i).getHasAck()) {
- info = packageAckInfoList.remove(i);
-
MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, info.getLen());
- } else {
- break;
- }
- }
- if (info != null) {
- LOGGER.info("save offset {} taskId {} instanceId {}",
info.getOffset(), profile.getTaskId(),
- profile.getInstanceId());
- OffsetProfile offsetProfile = new
OffsetProfile(profile.getTaskId(), profile.getInstanceId(),
- info.getOffset(), profile.get(INODE_INFO));
- offsetManager.setOffset(offsetProfile);
- }
- packageAckInfoLock.writeLock().unlock();
- }
-
- private void clearOffset() {
- packageAckInfoLock.writeLock().lock();
- for (int i = 0; i < packageAckInfoList.size();) {
- MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT,
packageAckInfoList.remove(i).getLen());
- }
- packageAckInfoLock.writeLock().unlock();
- }
-
/**
* Send message to proxy by batch, use message cache.
*/
@@ -369,26 +302,6 @@ public class SenderManager {
};
}
- /**
- * flushOffset
- *
- * @return thread runner
- */
- private Runnable flushOffset() {
- return () -> {
- AgentThreadFactory.nameThread(
- "flushOffset-" + profile.getTaskId() + "-" +
profile.getInstanceId());
- LOGGER.info("start flush offset {}:{}", inlongGroupId, sourcePath);
- offsetRunning = true;
- while (!shutdown) {
- doFlushOffset();
- AgentUtils.silenceSleepInMs(SAVE_OFFSET_INTERVAL_MS);
- }
- LOGGER.info("stop flush offset {}:{}", inlongGroupId, sourcePath);
- offsetRunning = false;
- };
- }
-
/**
* put the data into resend queue and will be resent later.
*
@@ -402,6 +315,10 @@ public class SenderManager {
}
}
+ public boolean sendFinished() {
+ return true;
+ }
+
/**
* sender callback
*/
@@ -425,7 +342,7 @@ public class SenderManager {
String instanceId = message.getInstanceId();
long dataTime = message.getDataTime();
if (result != null && result.equals(SendResult.OK)) {
- message.getAckInfo().setHasAck(true);
+ message.getOffsetAckList().forEach(ack -> ack.setHasAck(true));
getMetricItem(groupId,
streamId).pluginSendSuccessCount.addAndGet(msgCnt);
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS,
groupId, streamId,
dataTime, message.getMsgCnt(), message.getTotalSize());
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index 1d19ffc7de..e2b0b06517 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -68,6 +68,7 @@ import java.util.concurrent.TimeUnit;
import static org.apache.inlong.agent.constant.CommonConstants.COMMA;
import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
+import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_STREAM_ID;
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE;
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_SEND_PARTITION_KEY;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT;
@@ -156,7 +157,7 @@ public class LogFileSource extends AbstractSource {
linePosition = getInitLineOffset(isIncrement, taskId, instanceId,
inodeInfo);
bytePosition = getBytePositionByLine(linePosition);
queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
- dataTime =
DateTransUtils.timeStrConvertTomillSec(profile.getSourceDataTime(),
+ dataTime =
DateTransUtils.timeStrConvertToMillSec(profile.getSourceDataTime(),
profile.get(TASK_CYCLE_UNIT));
try {
registerMeta(profile);
@@ -348,12 +349,13 @@ public class LogFileSource extends AbstractSource {
private Message createMessage(SourceData sourceData) {
String msgWithMetaData = fillMetaData(sourceData.data);
- AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId,
inlongStreamId,
- dataTime, 1, msgWithMetaData.length());
String proxyPartitionKey = profile.get(PROXY_SEND_PARTITION_KEY,
DigestUtils.md5Hex(inlongGroupId));
Map<String, String> header = new HashMap<>();
header.put(PROXY_KEY_DATA, proxyPartitionKey);
header.put(OFFSET, sourceData.offset.toString());
+ header.put(PROXY_KEY_STREAM_ID, inlongStreamId);
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId,
header.get(PROXY_KEY_STREAM_ID),
+ dataTime, 1, msgWithMetaData.length());
Message finalMsg = new
DefaultMessage(msgWithMetaData.getBytes(StandardCharsets.UTF_8), header);
// if the message size is greater than max pack size,should drop it.
if (finalMsg.getBody().length > maxPackSize) {
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/WatchEntity.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/WatchEntity.java
index 8fb9755716..a846f699c7 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/WatchEntity.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/WatchEntity.java
@@ -274,7 +274,7 @@ public class WatchEntity {
logger.info("removeUselessWatchDirectories {}", curDataTime);
/* Calculate the data time which is 3 cycle units earlier than current
task data time. */
- long curDataTimeMillis =
DateTransUtils.timeStrConvertTomillSec(curDataTime, cycleUnit);
+ long curDataTimeMillis =
DateTransUtils.timeStrConvertToMillSec(curDataTime, cycleUnit);
Calendar calendar = Calendar.getInstance();
calendar.setTimeInMillis(curDataTimeMillis);
if ("D".equalsIgnoreCase(cycleUnit)) {
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
index b06478558d..62207acc81 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
@@ -124,7 +124,7 @@ public class NewDateUtils {
String retTime = DateTransUtils.millSecConvertToTimeStr(
System.currentTimeMillis(), cycleUnit);
try {
- long time = DateTransUtils.timeStrConvertTomillSec(dataTime,
cycleUnit);
+ long time = DateTransUtils.timeStrConvertToMillSec(dataTime,
cycleUnit);
Calendar calendar = Calendar.getInstance();
calendar.setTimeInMillis(time);
@@ -592,8 +592,8 @@ public class NewDateUtils {
long startTime;
long endTime;
try {
- startTime = DateTransUtils.timeStrConvertTomillSec(start,
cycleUnit);
- endTime = DateTransUtils.timeStrConvertTomillSec(end, cycleUnit);
+ startTime = DateTransUtils.timeStrConvertToMillSec(start,
cycleUnit);
+ endTime = DateTransUtils.timeStrConvertToMillSec(end, cycleUnit);
} catch (ParseException e) {
logger.error("date format is error: ", e);
return ret;
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
index d7c03bf099..0fae339400 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
@@ -21,7 +21,7 @@ import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.TaskConstants;
-import org.apache.inlong.agent.message.filecollect.PackageAckInfo;
+import org.apache.inlong.agent.message.filecollect.OffsetAckInfo;
import org.apache.inlong.agent.message.filecollect.SenderMessage;
import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
import org.apache.inlong.agent.plugin.utils.file.FileDataUtils;
@@ -50,8 +50,6 @@ import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import static org.awaitility.Awaitility.await;
-
@RunWith(PowerMockRunner.class)
@PrepareForTest(SenderManager.class)
@PowerMockIgnore({"javax.management.*"})
@@ -98,37 +96,45 @@ public class TestSenderManager {
Mockito.any(), Mockito.any(), Mockito.any(),
Mockito.anyLong(), Mockito.any(),
Mockito.anyLong(), Mockito.any(),
Mockito.any(), Mockito.anyBoolean());
-
senderManager.Start();
- Long packageIndex = 0L;
- Long packageOffset = 100L;
- List<byte[]> bodyList = new ArrayList<>();
- bodyList.add("123456789".getBytes(StandardCharsets.UTF_8));
- Integer resultBatchSize = 0;
- for (int i = 0; i < bodyList.size(); i++) {
- resultBatchSize += bodyList.get(i).length;
- }
+ Long offset = 0L;
+ List<OffsetAckInfo> ackInfoListTotal = new ArrayList<>();
for (int i = 0; i < 10; i++) {
- PackageAckInfo ackInfo = new PackageAckInfo(packageIndex++,
packageOffset, resultBatchSize, false);
+ List<byte[]> bodyList = new ArrayList<>();
+ List<OffsetAckInfo> ackInfoList = new ArrayList<>();
+ bodyList.add("123456789".getBytes(StandardCharsets.UTF_8));
+ for (int j = 0; j < bodyList.size(); j++) {
+ OffsetAckInfo ackInfo = new OffsetAckInfo(offset++,
bodyList.get(j).length, false);
+ ackInfoList.add(ackInfo);
+ ackInfoListTotal.add(ackInfo);
+ }
SenderMessage senderMessage = new SenderMessage("taskId",
"instanceId", "groupId", "streamId", bodyList,
- AgentUtils.getCurrentTime(), null, ackInfo);
+ AgentUtils.getCurrentTime(), null, ackInfoList);
senderManager.sendBatch(senderMessage);
- packageOffset += 100;
}
Assert.assertTrue(cbList.size() == 10);
for (int i = 0; i < 5; i++) {
cbList.get(4 - i).onMessageAck(SendResult.OK);
}
-
- await().atMost(2, TimeUnit.SECONDS).until(() ->
!senderManager.sendFinished());
+ Assert.assertTrue(calHasAckCount(ackInfoListTotal) == 5);
for (int i = 5; i < 10; i++) {
cbList.get(i).onMessageAck(SendResult.OK);
AgentUtils.silenceSleepInMs(10);
}
- await().atMost(2, TimeUnit.SECONDS).until(() ->
senderManager.sendFinished());
+
Assert.assertTrue(String.valueOf(calHasAckCount(ackInfoListTotal)),
calHasAckCount(ackInfoListTotal) == 10);
} catch (Exception e) {
e.printStackTrace();
Assert.assertTrue("testNormalAck failed", false);
}
}
+
+ private int calHasAckCount(List<OffsetAckInfo> ackInfoListTotal) {
+ int count = 0;
+ for (int i = 0; i < ackInfoListTotal.size(); i++) {
+ if (ackInfoListTotal.get(i).getHasAck()) {
+ count++;
+ }
+ }
+ return count;
+ }
}
\ No newline at end of file